From 0e0fbf7e6a750b1c9b27edfac6123bee32039537 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 14 Sep 2011 08:43:27 -0700 Subject: [PATCH] Upgrade libuv to 4b9b692 --- deps/uv/include/uv-private/uv-unix.h | 1 + deps/uv/src/unix/core.c | 2 + deps/uv/src/unix/internal.h | 1 + deps/uv/src/unix/stream.c | 139 +++++++++++++++++++---------------- deps/uv/test/test-list.h | 2 + deps/uv/test/test-tcp-close.c | 129 ++++++++++++++++++++++++++++++++ deps/uv/uv.gyp | 1 + 7 files changed, 212 insertions(+), 63 deletions(-) create mode 100644 deps/uv/test/test-tcp-close.c diff --git a/deps/uv/include/uv-private/uv-unix.h b/deps/uv/include/uv-private/uv-unix.h index e6982c7..6a0ef90 100644 --- a/deps/uv/include/uv-private/uv-unix.h +++ b/deps/uv/include/uv-private/uv-unix.h @@ -61,6 +61,7 @@ typedef int uv_file; int write_index; \ uv_buf_t* bufs; \ int bufcnt; \ + int error; \ uv_buf_t bufsml[UV_REQ_BUFSML_SIZE]; #define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */ diff --git a/deps/uv/src/unix/core.c b/deps/uv/src/unix/core.c index fec2567..8d7b0d0 100644 --- a/deps/uv/src/unix/core.c +++ b/deps/uv/src/unix/core.c @@ -233,6 +233,8 @@ void uv__finish_close(uv_handle_t* handle) { case UV_TCP: assert(!ev_is_active(&((uv_stream_t*)handle)->read_watcher)); assert(!ev_is_active(&((uv_stream_t*)handle)->write_watcher)); + assert(((uv_stream_t*)handle)->fd == -1); + uv__stream_destroy((uv_stream_t*)handle); break; case UV_UDP: diff --git a/deps/uv/src/unix/internal.h b/deps/uv/src/unix/internal.h index f5677c7..4ec6ced 100644 --- a/deps/uv/src/unix/internal.h +++ b/deps/uv/src/unix/internal.h @@ -83,6 +83,7 @@ void uv_fatal_error(const int errorno, const char* syscall); void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream, uv_handle_type type); int uv__stream_open(uv_stream_t*, int fd, int flags); +void uv__stream_destroy(uv_stream_t* stream); void uv__stream_io(EV_P_ ev_io* watcher, int revents); void uv__server_io(EV_P_ ev_io* watcher, int revents); int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len); diff --git a/deps/uv/src/unix/stream.c b/deps/uv/src/unix/stream.c index b48f216..a5d860d 100644 --- a/deps/uv/src/unix/stream.c +++ b/deps/uv/src/unix/stream.c @@ -31,7 +31,7 @@ static void uv__stream_connect(uv_stream_t*); -static uv_write_t* uv__write(uv_stream_t* stream); +static void uv__write(uv_stream_t* stream); static void uv__read(uv_stream_t* stream); @@ -103,6 +103,39 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) { } +void uv__stream_destroy(uv_stream_t* stream) { + uv_write_t* req; + ngx_queue_t* q; + + assert(stream->flags & UV_CLOSED); + + while (!ngx_queue_empty(&stream->write_queue)) { + q = ngx_queue_head(&stream->write_queue); + ngx_queue_remove(q); + + req = ngx_queue_data(q, uv_write_t, queue); + if (req->bufs != req->bufsml) + free(req->bufs); + + if (req->cb) { + uv_err_new_artificial(req->handle->loop, UV_EINTR); + req->cb(req, -1); + } + } + + while (!ngx_queue_empty(&stream->write_completed_queue)) { + q = ngx_queue_head(&stream->write_completed_queue); + ngx_queue_remove(q); + + req = ngx_queue_data(q, uv_write_t, queue); + if (req->cb) { + uv_err_new_artificial(req->handle->loop, UV_OK); + req->cb(req, 0); + } + } +} + + void uv__server_io(EV_P_ ev_io* watcher, int revents) { int fd; struct sockaddr_storage addr; @@ -254,10 +287,28 @@ static void uv__drain(uv_stream_t* stream) { } +static void uv__write_req_finish(uv_write_t* req) { + uv_stream_t* stream = req->handle; + + /* Pop the req off tcp->write_queue. */ + ngx_queue_remove(&req->queue); + if (req->bufs != req->bufsml) { + free(req->bufs); + } + req->bufs = NULL; + + /* Add it to the write_completed_queue where it will have its + * callback called in the near future. + */ + ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue); + ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE); +} + + /* On success returns NULL. On error returns a pointer to the write request * which had the error. */ -static uv_write_t* uv__write(uv_stream_t* stream) { +static void uv__write(uv_stream_t* stream) { uv_write_t* req; struct iovec* iov; int iovcnt; @@ -271,7 +322,7 @@ static uv_write_t* uv__write(uv_stream_t* stream) { req = uv_write_queue_head(stream); if (!req) { assert(stream->write_queue_size == 0); - return NULL; + return; } assert(req->handle == stream); @@ -299,8 +350,9 @@ static uv_write_t* uv__write(uv_stream_t* stream) { if (n < 0) { if (errno != EAGAIN) { /* Error */ - uv_err_new(stream->loop, errno); - return req; + req->error = errno; + uv__write_req_finish(req); + return; } } else { /* Successful write */ @@ -334,21 +386,9 @@ static uv_write_t* uv__write(uv_stream_t* stream) { if (req->write_index == req->bufcnt) { /* Then we're done! */ assert(n == 0); - - /* Pop the req off tcp->write_queue. */ - ngx_queue_remove(&req->queue); - if (req->bufs != req->bufsml) { - free(req->bufs); - } - req->bufs = NULL; - - /* Add it to the write_completed_queue where it will have its - * callback called in the near future. - * TODO: start trying to write the next request. - */ - ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue); - ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE); - return NULL; + uv__write_req_finish(req); + /* TODO: start trying to write the next request. */ + return; } } } @@ -359,8 +399,6 @@ static uv_write_t* uv__write(uv_stream_t* stream) { /* We're not done. */ ev_io_start(stream->loop->ev, &stream->write_watcher); - - return NULL; } @@ -378,7 +416,8 @@ static void uv__write_callbacks(uv_stream_t* stream) { /* NOTE: call callback AFTER freeing the request data. */ if (req->cb) { - req->cb(req, 0); + uv_err_new_artificial(stream->loop, req->error); + req->cb(req, req->error ? -1 : 0); } callbacks_made++; @@ -495,15 +534,8 @@ void uv__stream_io(EV_P_ ev_io* watcher, int revents) { } if (revents & EV_WRITE) { - uv_write_t* req = uv__write(stream); - if (req) { - /* Error. Notify the user. */ - if (req->cb) { - req->cb(req, -1); - } - } else { - uv__write_callbacks(stream); - } + uv__write(stream); + uv__write_callbacks(stream); } } } @@ -631,34 +663,29 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr, /* The buffers to be written must remain valid until the callback is called. * This is not required for the uv_buf_t array. */ -int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, +int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { - uv_stream_t* stream; int empty_queue; - stream = (uv_stream_t*)handle; - - /* Initialize the req */ - uv__req_init((uv_req_t*) req); - req->cb = cb; - req->handle = handle; - ngx_queue_init(&req->queue); - - assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE) + assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) && "uv_write (unix) does not yet support other types of streams"); - empty_queue = (stream->write_queue_size == 0); - if (stream->fd < 0) { uv_err_new(stream->loop, EBADF); return -1; } - ngx_queue_init(&req->queue); - req->type = UV_WRITE; + empty_queue = (stream->write_queue_size == 0); + /* Initialize the req */ + uv__req_init((uv_req_t*) req); + req->cb = cb; + req->handle = stream; + req->error = 0; + req->type = UV_WRITE; + ngx_queue_init(&req->queue); - if (bufcnt < UV_REQ_BUFSML_SIZE) { + if (bufcnt <= UV_REQ_BUFSML_SIZE) { req->bufs = req->bufsml; } else { @@ -688,22 +715,8 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, * for the fd to become writable. */ if (empty_queue) { - if (uv__write(stream)) { - /* Error. uv_last_error has been set. */ - return -1; - } - } - - /* If the queue is now empty - we've flushed the request already. That - * means we need to make the callback. The callback can only be done on a - * fresh stack so we feed the event loop in order to service it. - */ - if (ngx_queue_empty(&stream->write_queue)) { - ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE); + uv__write(stream); } else { - /* Otherwise there is data to write - so we should wait for the file - * descriptor to become writable. - */ ev_io_start(stream->loop->ev, &stream->write_watcher); } diff --git a/deps/uv/test/test-list.h b/deps/uv/test/test-list.h index 0c74310..d510be2 100644 --- a/deps/uv/test/test-list.h +++ b/deps/uv/test/test-list.h @@ -31,6 +31,7 @@ TEST_DECLARE (tcp_bind_error_fault) TEST_DECLARE (tcp_bind_error_inval) TEST_DECLARE (tcp_bind_localhost_ok) TEST_DECLARE (tcp_listen_without_bind) +TEST_DECLARE (tcp_close) TEST_DECLARE (tcp_bind6_error_addrinuse) TEST_DECLARE (tcp_bind6_error_addrnotavail) TEST_DECLARE (tcp_bind6_error_fault) @@ -117,6 +118,7 @@ TASK_LIST_START TEST_ENTRY (tcp_bind_error_inval) TEST_ENTRY (tcp_bind_localhost_ok) TEST_ENTRY (tcp_listen_without_bind) + TEST_ENTRY (tcp_close) TEST_ENTRY (tcp_bind6_error_addrinuse) TEST_ENTRY (tcp_bind6_error_addrnotavail) diff --git a/deps/uv/test/test-tcp-close.c b/deps/uv/test/test-tcp-close.c new file mode 100644 index 0000000..5da8a84 --- /dev/null +++ b/deps/uv/test/test-tcp-close.c @@ -0,0 +1,129 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include /* memset */ + +#define NUM_WRITE_REQS 32 + +static uv_tcp_t tcp_handle; +static uv_connect_t connect_req; + +static int write_cb_called; +static int close_cb_called; + +static void connect_cb(uv_connect_t* req, int status); +static void write_cb(uv_write_t* req, int status); +static void close_cb(uv_handle_t* handle); + + +static void connect_cb(uv_connect_t* conn_req, int status) { + uv_write_t* req; + uv_buf_t buf; + int i, r; + + buf = uv_buf_init("PING", 4); + for (i = 0; i < NUM_WRITE_REQS; i++) { + req = malloc(sizeof *req); + ASSERT(req != NULL); + + r = uv_write(req, (uv_stream_t*)&tcp_handle, &buf, 1, write_cb); + ASSERT(r == 0); + } + + uv_close((uv_handle_t*)&tcp_handle, close_cb); +} + + +static void write_cb(uv_write_t* req, int status) { + /* write callbacks should run before the close callback */ + ASSERT(close_cb_called == 0); + ASSERT(req->handle == (uv_stream_t*)&tcp_handle); + write_cb_called++; + free(req); +} + + +static void close_cb(uv_handle_t* handle) { + ASSERT(handle == (uv_handle_t*)&tcp_handle); + close_cb_called++; +} + + +static void connection_cb(uv_stream_t* server, int status) { + ASSERT(status == 0); +} + + +static void start_server(uv_loop_t* loop, uv_tcp_t* handle) { + int r; + + r = uv_tcp_init(loop, handle); + ASSERT(r == 0); + + r = uv_tcp_bind(handle, uv_ip4_addr("127.0.0.1", TEST_PORT)); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)handle, 128, connection_cb); + ASSERT(r == 0); + + uv_unref(loop); +} + + +/* Check that pending write requests have their callbacks + * invoked when the handle is closed. + */ +TEST_IMPL(tcp_close) { + uv_loop_t* loop; + uv_tcp_t tcp_server; + int r; + + loop = uv_default_loop(); + + /* We can't use the echo server, it doesn't handle ECONNRESET. */ + start_server(loop, &tcp_server); + + r = uv_tcp_init(loop, &tcp_handle); + ASSERT(r == 0); + + r = uv_tcp_connect(&connect_req, + &tcp_handle, + uv_ip4_addr("127.0.0.1", TEST_PORT), + connect_cb); + ASSERT(r == 0); + + ASSERT(write_cb_called == 0); + ASSERT(close_cb_called == 0); + + r = uv_run(loop); + ASSERT(r == 0); + + printf("%d of %d write reqs seen\n", write_cb_called, NUM_WRITE_REQS); + + ASSERT(write_cb_called == NUM_WRITE_REQS); + ASSERT(close_cb_called == 1); + + return 0; +} diff --git a/deps/uv/uv.gyp b/deps/uv/uv.gyp index b70940e..777aed3 100644 --- a/deps/uv/uv.gyp +++ b/deps/uv/uv.gyp @@ -260,6 +260,7 @@ 'test/test-spawn.c', 'test/test-tcp-bind-error.c', 'test/test-tcp-bind6-error.c', + 'test/test-tcp-close.c', 'test/test-tcp-writealot.c', 'test/test-threadpool.c', 'test/test-timer-again.c', -- 2.7.4