Upgrade libuv to 4b9b692
authorRyan Dahl <ry@tinyclouds.org>
Wed, 14 Sep 2011 15:43:27 +0000 (08:43 -0700)
committerRyan Dahl <ry@tinyclouds.org>
Wed, 14 Sep 2011 15:43:27 +0000 (08:43 -0700)
deps/uv/include/uv-private/uv-unix.h
deps/uv/src/unix/core.c
deps/uv/src/unix/internal.h
deps/uv/src/unix/stream.c
deps/uv/test/test-list.h
deps/uv/test/test-tcp-close.c [new file with mode: 0644]
deps/uv/uv.gyp

index e6982c7..6a0ef90 100644 (file)
@@ -61,6 +61,7 @@ typedef int uv_file;
   int write_index; \
   uv_buf_t* bufs; \
   int bufcnt; \
+  int error; \
   uv_buf_t bufsml[UV_REQ_BUFSML_SIZE];
 
 #define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */
index fec2567..8d7b0d0 100644 (file)
@@ -233,6 +233,8 @@ void uv__finish_close(uv_handle_t* handle) {
     case UV_TCP:
       assert(!ev_is_active(&((uv_stream_t*)handle)->read_watcher));
       assert(!ev_is_active(&((uv_stream_t*)handle)->write_watcher));
+      assert(((uv_stream_t*)handle)->fd == -1);
+      uv__stream_destroy((uv_stream_t*)handle);
       break;
 
     case UV_UDP:
index f5677c7..4ec6ced 100644 (file)
@@ -83,6 +83,7 @@ void uv_fatal_error(const int errorno, const char* syscall);
 void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream,
     uv_handle_type type);
 int uv__stream_open(uv_stream_t*, int fd, int flags);
+void uv__stream_destroy(uv_stream_t* stream);
 void uv__stream_io(EV_P_ ev_io* watcher, int revents);
 void uv__server_io(EV_P_ ev_io* watcher, int revents);
 int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
index b48f216..a5d860d 100644 (file)
@@ -31,7 +31,7 @@
 
 
 static void uv__stream_connect(uv_stream_t*);
-static uv_write_t* uv__write(uv_stream_t* stream);
+static void uv__write(uv_stream_t* stream);
 static void uv__read(uv_stream_t* stream);
 
 
@@ -103,6 +103,39 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
 }
 
 
+void uv__stream_destroy(uv_stream_t* stream) {
+  uv_write_t* req;
+  ngx_queue_t* q;
+
+  assert(stream->flags & UV_CLOSED);
+
+  while (!ngx_queue_empty(&stream->write_queue)) {
+    q = ngx_queue_head(&stream->write_queue);
+    ngx_queue_remove(q);
+
+    req = ngx_queue_data(q, uv_write_t, queue);
+    if (req->bufs != req->bufsml)
+      free(req->bufs);
+
+    if (req->cb) {
+      uv_err_new_artificial(req->handle->loop, UV_EINTR);
+      req->cb(req, -1);
+    }
+  }
+
+  while (!ngx_queue_empty(&stream->write_completed_queue)) {
+    q = ngx_queue_head(&stream->write_completed_queue);
+    ngx_queue_remove(q);
+
+    req = ngx_queue_data(q, uv_write_t, queue);
+    if (req->cb) {
+      uv_err_new_artificial(req->handle->loop, UV_OK);
+      req->cb(req, 0);
+    }
+  }
+}
+
+
 void uv__server_io(EV_P_ ev_io* watcher, int revents) {
   int fd;
   struct sockaddr_storage addr;
@@ -254,10 +287,28 @@ static void uv__drain(uv_stream_t* stream) {
 }
 
 
+static void uv__write_req_finish(uv_write_t* req) {
+  uv_stream_t* stream = req->handle;
+
+  /* Pop the req off tcp->write_queue. */
+  ngx_queue_remove(&req->queue);
+  if (req->bufs != req->bufsml) {
+    free(req->bufs);
+  }
+  req->bufs = NULL;
+
+  /* Add it to the write_completed_queue where it will have its
+   * callback called in the near future.
+   */
+  ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
+  ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
+}
+
+
 /* On success returns NULL. On error returns a pointer to the write request
  * which had the error.
  */
-static uv_write_t* uv__write(uv_stream_t* stream) {
+static void uv__write(uv_stream_t* stream) {
   uv_write_t* req;
   struct iovec* iov;
   int iovcnt;
@@ -271,7 +322,7 @@ static uv_write_t* uv__write(uv_stream_t* stream) {
   req = uv_write_queue_head(stream);
   if (!req) {
     assert(stream->write_queue_size == 0);
-    return NULL;
+    return;
   }
 
   assert(req->handle == stream);
@@ -299,8 +350,9 @@ static uv_write_t* uv__write(uv_stream_t* stream) {
   if (n < 0) {
     if (errno != EAGAIN) {
       /* Error */
-      uv_err_new(stream->loop, errno);
-      return req;
+      req->error = errno;
+      uv__write_req_finish(req);
+      return;
     }
   } else {
     /* Successful write */
@@ -334,21 +386,9 @@ static uv_write_t* uv__write(uv_stream_t* stream) {
         if (req->write_index == req->bufcnt) {
           /* Then we're done! */
           assert(n == 0);
-
-          /* Pop the req off tcp->write_queue. */
-          ngx_queue_remove(&req->queue);
-          if (req->bufs != req->bufsml) {
-            free(req->bufs);
-          }
-          req->bufs = NULL;
-
-          /* Add it to the write_completed_queue where it will have its
-           * callback called in the near future.
-           * TODO: start trying to write the next request.
-           */
-          ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
-          ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
-          return NULL;
+          uv__write_req_finish(req);
+          /* TODO: start trying to write the next request. */
+          return;
         }
       }
     }
@@ -359,8 +399,6 @@ static uv_write_t* uv__write(uv_stream_t* stream) {
 
   /* We're not done. */
   ev_io_start(stream->loop->ev, &stream->write_watcher);
-
-  return NULL;
 }
 
 
@@ -378,7 +416,8 @@ static void uv__write_callbacks(uv_stream_t* stream) {
 
     /* NOTE: call callback AFTER freeing the request data. */
     if (req->cb) {
-      req->cb(req, 0);
+      uv_err_new_artificial(stream->loop, req->error);
+      req->cb(req, req->error ? -1 : 0);
     }
 
     callbacks_made++;
@@ -495,15 +534,8 @@ void uv__stream_io(EV_P_ ev_io* watcher, int revents) {
     }
 
     if (revents & EV_WRITE) {
-      uv_write_t* req = uv__write(stream);
-      if (req) {
-        /* Error. Notify the user. */
-        if (req->cb) {
-          req->cb(req, -1);
-        }
-      } else {
-        uv__write_callbacks(stream);
-      }
+      uv__write(stream);
+      uv__write_callbacks(stream);
     }
   }
 }
@@ -631,34 +663,29 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
 /* The buffers to be written must remain valid until the callback is called.
  * This is not required for the uv_buf_t array.
  */
-int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
+int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
     uv_write_cb cb) {
-  uv_stream_t* stream;
   int empty_queue;
 
-  stream = (uv_stream_t*)handle;
-
-  /* Initialize the req */
-  uv__req_init((uv_req_t*) req);
-  req->cb = cb;
-  req->handle = handle;
-  ngx_queue_init(&req->queue);
-
-  assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
+  assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE)
       && "uv_write (unix) does not yet support other types of streams");
 
-  empty_queue = (stream->write_queue_size == 0);
-
   if (stream->fd < 0) {
     uv_err_new(stream->loop, EBADF);
     return -1;
   }
 
-  ngx_queue_init(&req->queue);
-  req->type = UV_WRITE;
+  empty_queue = (stream->write_queue_size == 0);
 
+  /* Initialize the req */
+  uv__req_init((uv_req_t*) req);
+  req->cb = cb;
+  req->handle = stream;
+  req->error = 0;
+  req->type = UV_WRITE;
+  ngx_queue_init(&req->queue);
 
-  if (bufcnt < UV_REQ_BUFSML_SIZE) {
+  if (bufcnt <= UV_REQ_BUFSML_SIZE) {
     req->bufs = req->bufsml;
   }
   else {
@@ -688,22 +715,8 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
    * for the fd to become writable.
    */
   if (empty_queue) {
-    if (uv__write(stream)) {
-      /* Error. uv_last_error has been set. */
-      return -1;
-    }
-  }
-
-  /* If the queue is now empty - we've flushed the request already. That
-   * means we need to make the callback. The callback can only be done on a
-   * fresh stack so we feed the event loop in order to service it.
-   */
-  if (ngx_queue_empty(&stream->write_queue)) {
-    ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
+    uv__write(stream);
   } else {
-    /* Otherwise there is data to write - so we should wait for the file
-     * descriptor to become writable.
-     */
     ev_io_start(stream->loop->ev, &stream->write_watcher);
   }
 
index 0c74310..d510be2 100644 (file)
@@ -31,6 +31,7 @@ TEST_DECLARE   (tcp_bind_error_fault)
 TEST_DECLARE   (tcp_bind_error_inval)
 TEST_DECLARE   (tcp_bind_localhost_ok)
 TEST_DECLARE   (tcp_listen_without_bind)
+TEST_DECLARE   (tcp_close)
 TEST_DECLARE   (tcp_bind6_error_addrinuse)
 TEST_DECLARE   (tcp_bind6_error_addrnotavail)
 TEST_DECLARE   (tcp_bind6_error_fault)
@@ -117,6 +118,7 @@ TASK_LIST_START
   TEST_ENTRY  (tcp_bind_error_inval)
   TEST_ENTRY  (tcp_bind_localhost_ok)
   TEST_ENTRY  (tcp_listen_without_bind)
+  TEST_ENTRY  (tcp_close)
 
   TEST_ENTRY  (tcp_bind6_error_addrinuse)
   TEST_ENTRY  (tcp_bind6_error_addrnotavail)
diff --git a/deps/uv/test/test-tcp-close.c b/deps/uv/test/test-tcp-close.c
new file mode 100644 (file)
index 0000000..5da8a84
--- /dev/null
@@ -0,0 +1,129 @@
+/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "uv.h"
+#include "task.h"
+
+#include <errno.h>
+#include <string.h> /* memset */
+
+#define NUM_WRITE_REQS 32
+
+static uv_tcp_t tcp_handle;
+static uv_connect_t connect_req;
+
+static int write_cb_called;
+static int close_cb_called;
+
+static void connect_cb(uv_connect_t* req, int status);
+static void write_cb(uv_write_t* req, int status);
+static void close_cb(uv_handle_t* handle);
+
+
+static void connect_cb(uv_connect_t* conn_req, int status) {
+  uv_write_t* req;
+  uv_buf_t buf;
+  int i, r;
+
+  buf = uv_buf_init("PING", 4);
+  for (i = 0; i < NUM_WRITE_REQS; i++) {
+    req = malloc(sizeof *req);
+    ASSERT(req != NULL);
+
+    r = uv_write(req, (uv_stream_t*)&tcp_handle, &buf, 1, write_cb);
+    ASSERT(r == 0);
+  }
+
+  uv_close((uv_handle_t*)&tcp_handle, close_cb);
+}
+
+
+static void write_cb(uv_write_t* req, int status) {
+  /* write callbacks should run before the close callback */
+  ASSERT(close_cb_called == 0);
+  ASSERT(req->handle == (uv_stream_t*)&tcp_handle);
+  write_cb_called++;
+  free(req);
+}
+
+
+static void close_cb(uv_handle_t* handle) {
+  ASSERT(handle == (uv_handle_t*)&tcp_handle);
+  close_cb_called++;
+}
+
+
+static void connection_cb(uv_stream_t* server, int status) {
+  ASSERT(status == 0);
+}
+
+
+static void start_server(uv_loop_t* loop, uv_tcp_t* handle) {
+  int r;
+
+  r = uv_tcp_init(loop, handle);
+  ASSERT(r == 0);
+
+  r = uv_tcp_bind(handle, uv_ip4_addr("127.0.0.1", TEST_PORT));
+  ASSERT(r == 0);
+
+  r = uv_listen((uv_stream_t*)handle, 128, connection_cb);
+  ASSERT(r == 0);
+
+  uv_unref(loop);
+}
+
+
+/* Check that pending write requests have their callbacks
+ * invoked when the handle is closed.
+ */
+TEST_IMPL(tcp_close) {
+  uv_loop_t* loop;
+  uv_tcp_t tcp_server;
+  int r;
+
+  loop = uv_default_loop();
+
+  /* We can't use the echo server, it doesn't handle ECONNRESET. */
+  start_server(loop, &tcp_server);
+
+  r = uv_tcp_init(loop, &tcp_handle);
+  ASSERT(r == 0);
+
+  r = uv_tcp_connect(&connect_req,
+                     &tcp_handle,
+                     uv_ip4_addr("127.0.0.1", TEST_PORT),
+                     connect_cb);
+  ASSERT(r == 0);
+
+  ASSERT(write_cb_called == 0);
+  ASSERT(close_cb_called == 0);
+
+  r = uv_run(loop);
+  ASSERT(r == 0);
+
+  printf("%d of %d write reqs seen\n", write_cb_called, NUM_WRITE_REQS);
+
+  ASSERT(write_cb_called == NUM_WRITE_REQS);
+  ASSERT(close_cb_called == 1);
+
+  return 0;
+}
index b70940e..777aed3 100644 (file)
         'test/test-spawn.c',
         'test/test-tcp-bind-error.c',
         'test/test-tcp-bind6-error.c',
+        'test/test-tcp-close.c',
         'test/test-tcp-writealot.c',
         'test/test-threadpool.c',
         'test/test-timer-again.c',