Upgrade libuv to 1be48f12a0
authorRyan Dahl <ry@tinyclouds.org>
Fri, 15 Jul 2011 18:15:02 +0000 (11:15 -0700)
committerRyan Dahl <ry@tinyclouds.org>
Fri, 15 Jul 2011 20:41:44 +0000 (13:41 -0700)
and bindings for new req interface

17 files changed:
deps/uv/include/uv-unix.h
deps/uv/include/uv-win.h
deps/uv/include/uv.h
deps/uv/src/uv-unix.c
deps/uv/src/uv-win.c
deps/uv/test/benchmark-ping-pongs.c
deps/uv/test/benchmark-pump.c
deps/uv/test/dns-server.c
deps/uv/test/echo-server.c
deps/uv/test/test-callback-stack.c
deps/uv/test/test-connection-fail.c
deps/uv/test/test-delayed-accept.c
deps/uv/test/test-getsockname.c
deps/uv/test/test-ping-pong.c
deps/uv/test/test-shutdown-eof.c
deps/uv/test/test-tcp-writealot.c
src/tcp_wrap.cc

index d17ebc3..50b29c7 100644 (file)
@@ -41,14 +41,20 @@ typedef struct {
 
 #define UV_REQ_BUFSML_SIZE (4)
 
-#define UV_REQ_PRIVATE_FIELDS \
-  int write_index; \
-  ev_timer timer; \
+#define UV_REQ_PRIVATE_FIELDS  /* empty */
+
+#define UV_WRITE_PRIVATE_FIELDS \
   ngx_queue_t queue; \
+  int write_index; \
   uv_buf_t* bufs; \
   int bufcnt; \
   uv_buf_t bufsml[UV_REQ_BUFSML_SIZE];
 
+#define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */
+
+#define UV_CONNECT_PRIVATE_FIELDS \
+  ngx_queue_t queue;
+
 
 /* TODO: union or classes please! */
 #define UV_HANDLE_PRIVATE_FIELDS \
@@ -67,8 +73,8 @@ typedef struct {
   int delayed_error; \
   uv_connection_cb connection_cb; \
   int accepted_fd; \
-  uv_req_t *connect_req; \
-  uv_req_t *shutdown_req; \
+  uv_connect_t *connect_req; \
+  uv_shutdown_t *shutdown_req; \
   ev_io read_watcher; \
   ev_io write_watcher; \
   ngx_queue_t write_queue; \
index 2d6093a..e0bfa84 100644 (file)
@@ -42,22 +42,6 @@ typedef struct uv_buf_t {
   char* base;
 } uv_buf_t;
 
-/*
- * Private uv_pipe_instance state.
- */
-typedef enum {
-  UV_PIPEINSTANCE_CONNECTED = 0,
-  UV_PIPEINSTANCE_DISCONNECTED,
-  UV_PIPEINSTANCE_ACTIVE
-} uv_pipeinstance_state;
-
-/* Used to store active pipe instances inside a linked list. */
-typedef struct uv_pipe_instance_s {
-  HANDLE handle;
-  uv_pipeinstance_state state;
-  struct uv_pipe_instance_s* next;
-} uv_pipe_instance_t;
-
 #define UV_REQ_PRIVATE_FIELDS             \
   union {                                 \
     /* Used by I/O operations */          \
@@ -66,13 +50,21 @@ typedef struct uv_pipe_instance_s {
       size_t queued_bytes;                \
     };                                    \
   };                                      \
-  int flags;                              \
   uv_err_t error;                         \
   struct uv_req_s* next_req;
 
+#define UV_WRITE_PRIVATE_FIELDS           \
+  /* empty */
+
+#define UV_CONNECT_PRIVATE_FIELDS         \
+  /* empty */
+
+#define UV_SHUTDOWN_PRIVATE_FIELDS        \
+  /* empty */
+
 #define uv_stream_connection_fields       \
   unsigned int write_reqs_pending;        \
-  uv_req_t* shutdown_req;
+  uv_shutdown_t* shutdown_req;
 
 #define uv_stream_server_fields           \
   uv_connection_cb connection_cb;
@@ -81,7 +73,7 @@ typedef struct uv_pipe_instance_s {
   unsigned int reqs_pending;              \
   uv_alloc_cb alloc_cb;                   \
   uv_read_cb read_cb;                     \
-  struct uv_req_s read_req;               \
+  uv_req_t read_req;                      \
   union {                                 \
     struct { uv_stream_connection_fields };  \
     struct { uv_stream_server_fields     };  \
@@ -94,17 +86,19 @@ typedef struct uv_pipe_instance_s {
   };                                      \
   SOCKET accept_socket;                   \
   char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
-  struct uv_req_s accept_req;
+  struct uv_req_s accept_req;             \
 
 #define uv_pipe_server_fields             \
     char* name;                           \
-    uv_pipe_instance_t* connections;      \
-    struct uv_req_s accept_reqs[4];
+    struct uv_pipe_accept_s {             \
+      UV_REQ_FIELDS                       \
+      HANDLE pipeHandle;                  \
+      struct uv_pipe_accept_s* next_pending; \
+    } accept_reqs[4];                     \
+    struct uv_pipe_accept_s* pending_accepts;
 
 #define uv_pipe_connection_fields         \
-    uv_pipe_t* server;                    \
-    uv_pipe_instance_t* connection;       \
-    uv_pipe_instance_t clientConnection;
+  HANDLE handle;
 
 #define UV_PIPE_PRIVATE_FIELDS            \
   union {                                 \
@@ -120,6 +114,7 @@ typedef struct uv_pipe_instance_s {
 
 #define UV_ASYNC_PRIVATE_FIELDS           \
   struct uv_req_s async_req;              \
+  uv_async_cb async_cb;                   \
   /* char to avoid alignment issues */    \
   char volatile async_sent;
 
index d470a25..1f8c1ce 100644 (file)
@@ -48,10 +48,13 @@ typedef struct uv_timer_s uv_timer_t;
 typedef struct uv_prepare_s uv_prepare_t;
 typedef struct uv_check_s uv_check_t;
 typedef struct uv_idle_s uv_idle_t;
-typedef struct uv_req_s uv_req_t;
 typedef struct uv_async_s uv_async_t;
 typedef struct uv_getaddrinfo_s uv_getaddrinfo_t;
-
+/* Request types */
+typedef struct uv_req_s uv_req_t;
+typedef struct uv_shutdown_s uv_shutdown_t;
+typedef struct uv_write_s uv_write_t;
+typedef struct uv_connect_s uv_connect_t;
 
 #if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__)
 # include "uv-unix.h"
@@ -70,9 +73,9 @@ typedef struct uv_getaddrinfo_s uv_getaddrinfo_t;
  */
 typedef uv_buf_t (*uv_alloc_cb)(uv_stream_t* tcp, size_t suggested_size);
 typedef void (*uv_read_cb)(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf);
-typedef void (*uv_write_cb)(uv_req_t* req, int status);
-typedef void (*uv_connect_cb)(uv_req_t* req, int status);
-typedef void (*uv_shutdown_cb)(uv_req_t* req, int status);
+typedef void (*uv_write_cb)(uv_write_t* req, int status);
+typedef void (*uv_connect_cb)(uv_connect_t* req, int status);
+typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status);
 typedef void (*uv_connection_cb)(uv_handle_t* server, int status);
 typedef void (*uv_close_cb)(uv_handle_t* handle);
 typedef void (*uv_timer_cb)(uv_timer_t* handle, int status);
@@ -168,23 +171,34 @@ struct uv_err_s {
 };
 
 
-struct uv_req_s {
-  /* read-only */
-  uv_req_type type;
-  /* public */
-  uv_handle_t* handle;
-  void *(*cb)(void *);
-  void* data;
-  /* private */
+#define UV_REQ_FIELDS \
+  /* read-only */ \
+  uv_req_type type; \
+  /* public */ \
+  void* data; \
+  /* private */ \
   UV_REQ_PRIVATE_FIELDS
+
+/* Abstract base class of all requests. */
+struct uv_req_s {
+  UV_REQ_FIELDS
 };
 
+
 /*
- * Initialize a request for use with uv_write, uv_shutdown, or uv_connect.
+ * Shutdown the outgoing (write) side of a duplex stream. It waits for
+ * pending write requests to complete. The handle should refer to a
+ * initialized stream. req should be an uninitalized shutdown request
+ * struct. The cb is a called after shutdown is complete.
  */
-void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *));
+struct uv_shutdown_s {
+  UV_REQ_FIELDS
+  uv_stream_t* handle;
+  uv_shutdown_cb cb;
+  UV_SHUTDOWN_PRIVATE_FIELDS
+};
 
-int uv_shutdown(uv_req_t* req);
+int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb);
 
 
 #define UV_HANDLE_FIELDS \
@@ -251,7 +265,8 @@ int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
 
 int uv_read_stop(uv_stream_t*);
 
-/* Write data to stream. Buffers are written in order. Example:
+/*
+ * Write data to stream. Buffers are written in order. Example:
  *
  *   uv_buf_t a[] = {
  *     { .base = "1", .len = 1 },
@@ -268,7 +283,15 @@ int uv_read_stop(uv_stream_t*);
  *   uv_write(req, b, 2);
  *
  */
-int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt);
+struct uv_write_s {
+  UV_REQ_FIELDS
+  uv_write_cb cb;
+  uv_stream_t* handle;
+  UV_WRITE_PRIVATE_FIELDS
+};
+
+int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
+    uv_write_cb cb);
 
 
 /*
@@ -287,8 +310,23 @@ int uv_tcp_init(uv_tcp_t* handle);
 int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in);
 int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6);
 
-int uv_tcp_connect(uv_req_t* req, struct sockaddr_in);
-int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6);
+/*
+ * uv_tcp_connect, uv_tcp_connect6
+ * These functions establish IPv4 and IPv6 TCP connections. Provide an
+ * initialized TCP handle and an uninitialized uv_connect_t*. The callback
+ * will be made when the connection is estabished.
+ */
+struct uv_connect_s {
+  UV_REQ_FIELDS
+  uv_connect_cb cb;
+  uv_stream_t* handle;
+  UV_CONNECT_PRIVATE_FIELDS
+};
+
+int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
+    struct sockaddr_in address, uv_connect_cb cb);
+int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
+    struct sockaddr_in6 address, uv_connect_cb cb);
 
 int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
 
@@ -298,10 +336,10 @@ int uv_getsockname(uv_tcp_t* handle, struct sockaddr* name, int* namelen);
 /*
  * A subclass of uv_stream_t representing a pipe stream or pipe server.
  */
-struct uv_pipe_s { 
-  UV_HANDLE_FIELDS 
-  UV_STREAM_FIELDS 
-  UV_PIPE_PRIVATE_FIELDS 
+struct uv_pipe_s {
+  UV_HANDLE_FIELDS
+  UV_STREAM_FIELDS
+  UV_PIPE_PRIVATE_FIELDS
 };
 
 int uv_pipe_init(uv_pipe_t* handle);
@@ -310,7 +348,8 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name);
 
 int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb);
 
-int uv_pipe_connect(uv_req_t* req, const char* name);
+int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
+    const char* name, uv_connect_cb cb);
 
 
 /*
@@ -489,7 +528,7 @@ int uv_exepath(char* buffer, size_t* size);
 extern uint64_t uv_hrtime(void);
 
 
-/* the presence of this union forces similar struct layout */
+/* the presence of these unions force similar struct layout */
 union uv_any_handle {
   uv_tcp_t tcp;
   uv_pipe_t pipe;
@@ -501,6 +540,14 @@ union uv_any_handle {
   uv_getaddrinfo_t getaddrinfo;
 };
 
+union uv_any_req {
+  uv_req_t req;
+  uv_write_t write;
+  uv_connect_t connect;
+  uv_shutdown_t shutdown;
+};
+
+
 /* Diagnostic counters */
 typedef struct {
   uint64_t req_init;
index ab8a5e6..1dc20d8 100644 (file)
@@ -59,6 +59,7 @@ struct uv_ares_data_s {
 static struct uv_ares_data_s ares_data;
 
 
+void uv__req_init(uv_req_t*);
 void uv__tcp_io(EV_P_ ev_io* watcher, int revents);
 void uv__next(EV_P_ ev_idle* watcher, int revents);
 static void uv__tcp_connect(uv_tcp_t*);
@@ -518,9 +519,9 @@ void uv__finish_close(uv_handle_t* handle) {
 }
 
 
-uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) {
+uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) {
   ngx_queue_t* q;
-  uv_req_t* req;
+  uv_write_t* req;
 
   if (ngx_queue_empty(&tcp->write_queue)) {
     return NULL;
@@ -531,7 +532,7 @@ uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) {
     return NULL;
   }
 
-  req = ngx_queue_data(q, struct uv_req_s, queue);
+  req = ngx_queue_data(q, struct uv_write_s, queue);
   assert(req);
 
   return req;
@@ -552,8 +553,7 @@ void uv__next(EV_P_ ev_idle* watcher, int revents) {
 
 
 static void uv__drain(uv_tcp_t* tcp) {
-  uv_req_t* req;
-  uv_shutdown_cb cb;
+  uv_shutdown_t* req;
 
   assert(!uv_write_queue_head(tcp));
   assert(tcp->write_queue_size == 0);
@@ -567,16 +567,19 @@ static void uv__drain(uv_tcp_t* tcp) {
     assert(tcp->shutdown_req);
 
     req = tcp->shutdown_req;
-    cb = (uv_shutdown_cb)req->cb;
 
     if (shutdown(tcp->fd, SHUT_WR)) {
       /* Error. Report it. User should call uv_close(). */
       uv_err_new((uv_handle_t*)tcp, errno);
-      if (cb) cb(req, -1);
+      if (req->cb) {
+        req->cb(req, -1);
+      }
     } else {
       uv_err_new((uv_handle_t*)tcp, 0);
       uv_flag_set((uv_handle_t*)tcp, UV_SHUT);
-      if (cb) cb(req, 0);
+      if (req->cb) {
+        req->cb(req, 0);
+      }
     }
   }
 }
@@ -585,8 +588,8 @@ static void uv__drain(uv_tcp_t* tcp) {
 /* On success returns NULL. On error returns a pointer to the write request
  * which had the error.
  */
-static uv_req_t* uv__write(uv_tcp_t* tcp) {
-  uv_req_t* req;
+static uv_write_t* uv__write(uv_tcp_t* tcp) {
+  uv_write_t* req;
   struct iovec* iov;
   int iovcnt;
   ssize_t n;
@@ -602,7 +605,7 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) {
     return NULL;
   }
 
-  assert(req->handle == (uv_handle_t*)tcp);
+  assert(req->handle == (uv_stream_t*)tcp);
 
   /* Cast to iovec. We had to have our own uv_buf_t instead of iovec
    * because Windows's WSABUF is not an iovec.
@@ -691,23 +694,20 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) {
 
 
 static void uv__write_callbacks(uv_tcp_t* tcp) {
-  uv_write_cb cb;
   int callbacks_made = 0;
   ngx_queue_t* q;
-  uv_req_t* req;
+  uv_write_t* req;
 
   while (!ngx_queue_empty(&tcp->write_completed_queue)) {
     /* Pop a req off write_completed_queue. */
     q = ngx_queue_head(&tcp->write_completed_queue);
     assert(q);
-    req = ngx_queue_data(q, struct uv_req_s, queue);
+    req = ngx_queue_data(q, struct uv_write_s, queue);
     ngx_queue_remove(q);
 
-    cb = (uv_write_cb) req->cb;
-
     /* NOTE: call callback AFTER freeing the request data. */
-    if (cb) {
-      cb(req, 0);
+    if (req->cb) {
+      req->cb(req, 0);
     }
 
     callbacks_made++;
@@ -772,10 +772,16 @@ void uv__read(uv_tcp_t* tcp) {
 }
 
 
-int uv_shutdown(uv_req_t* req) {
-  uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
+int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
+  uv_tcp_t* tcp = (uv_tcp_t*)handle;
+  assert(handle->type == UV_TCP &&
+      "uv_shutdown (unix) only supports uv_tcp_t right now");
   assert(tcp->fd >= 0);
-  assert(tcp->type == UV_TCP);
+
+  /* Initialize request */
+  uv__req_init((uv_req_t*)req);
+  req->handle = handle;
+  req->cb = cb;
 
   if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT) ||
       uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSED) ||
@@ -796,11 +802,11 @@ int uv_shutdown(uv_req_t* req) {
 
 void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
   uv_tcp_t* tcp = watcher->data;
+
+  assert(tcp->type == UV_TCP);
   assert(watcher == &tcp->read_watcher ||
          watcher == &tcp->write_watcher);
-
   assert(tcp->fd >= 0);
-
   assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING));
 
   if (tcp->connect_req) {
@@ -811,13 +817,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
     }
 
     if (revents & EV_WRITE) {
-      uv_req_t* req = uv__write(tcp);
+      uv_write_t* req = uv__write(tcp);
       if (req) {
         /* Error. Notify the user. */
-        uv_write_cb cb = (uv_write_cb) req->cb;
-
-        if (cb) {
-          cb(req, -1);
+        if (req->cb) {
+          req->cb(req, -1);
         }
       } else {
         uv__write_callbacks(tcp);
@@ -834,13 +838,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
  */
 static void uv__tcp_connect(uv_tcp_t* tcp) {
   int error;
-  uv_req_t* req;
-  uv_connect_cb connect_cb;
+  uv_connect_t* req = tcp->connect_req;
   socklen_t errorsize = sizeof(int);
 
+  assert(tcp->type == UV_TCP);
   assert(tcp->fd >= 0);
-
-  req = tcp->connect_req;
   assert(req);
 
   if (tcp->delayed_error) {
@@ -860,9 +862,8 @@ static void uv__tcp_connect(uv_tcp_t* tcp) {
 
     /* Successful connection */
     tcp->connect_req = NULL;
-    connect_cb = (uv_connect_cb) req->cb;
-    if (connect_cb) {
-      connect_cb(req, 0);
+    if (req->cb) {
+      req->cb(req, 0);
     }
 
   } else if (error == EINPROGRESS) {
@@ -873,18 +874,15 @@ static void uv__tcp_connect(uv_tcp_t* tcp) {
     uv_err_new((uv_handle_t*)tcp, error);
 
     tcp->connect_req = NULL;
-
-    connect_cb = (uv_connect_cb) req->cb;
-    if (connect_cb) {
-      connect_cb(req, -1);
+    if (req->cb) {
+      req->cb(req, -1);
     }
   }
 }
 
 
-static int uv__connect(uv_req_t* req, struct sockaddr* addr,
-    socklen_t addrlen) {
-  uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
+static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr,
+    socklen_t addrlen, uv_connect_cb cb) {
   int r;
 
   if (tcp->fd <= 0) {
@@ -901,6 +899,9 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr,
     }
   }
 
+  uv__req_init((uv_req_t*)req);
+  req->cb = cb;
+  req->handle = (uv_stream_t*)tcp;
   req->type = UV_CONNECT;
   ngx_queue_init(&req->queue);
 
@@ -947,17 +948,21 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr,
 }
 
 
-int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) {
-  assert(addr.sin_family == AF_INET);
-  return uv__connect(req, (struct sockaddr*) &addr,
-      sizeof(struct sockaddr_in));
+int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
+    struct sockaddr_in address, uv_connect_cb cb) {
+  assert(handle->type == UV_TCP);
+  assert(address.sin_family == AF_INET);
+  return uv__connect(req, handle, (struct sockaddr*) &address,
+      sizeof(struct sockaddr_in), cb);
 }
 
 
-int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) {
-  assert(addr.sin6_family == AF_INET6);
-  return uv__connect(req, (struct sockaddr*) &addr,
-      sizeof(struct sockaddr_in6));
+int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
+    struct sockaddr_in6 address, uv_connect_cb cb) {
+  assert(handle->type == UV_TCP);
+  assert(address.sin6_family == AF_INET6);
+  return uv__connect(req, handle, (struct sockaddr*) &address,
+      sizeof(struct sockaddr_in6), cb);
 }
 
 
@@ -997,9 +1002,21 @@ static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
 /* The buffers to be written must remain valid until the callback is called.
  * This is not required for the uv_buf_t array.
  */
-int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
-  uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
-  int empty_queue = (tcp->write_queue_size == 0);
+int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
+    uv_write_cb cb) {
+  int empty_queue;
+  uv_tcp_t* tcp = (uv_tcp_t*)handle;
+
+  /* Initialize the req */
+  uv__req_init((uv_req_t*) req);
+  req->cb = cb;
+  req->handle = handle;
+  ngx_queue_init(&req->queue);
+
+  assert(handle->type == UV_TCP &&
+      "uv_write (unix) does not yet support other types of streams");
+
+  empty_queue = (tcp->write_queue_size == 0);
   assert(tcp->fd >= 0);
 
   ngx_queue_init(&req->queue);
@@ -1118,12 +1135,10 @@ int uv_read_stop(uv_stream_t* stream) {
 }
 
 
-void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) {
+void uv__req_init(uv_req_t* req) {
   uv_counters()->req_init++;
   req->type = UV_UNKNOWN_REQ;
-  req->cb = cb;
-  req->handle = handle;
-  ngx_queue_init(&req->queue);
+  req->data = NULL;
 }
 
 
@@ -1634,6 +1649,7 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
 }
 
 
-int uv_pipe_connect(uv_req_t* req, const char* name) {
+int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
+    const char* name, uv_connect_cb cb) {
   assert(0 && "implement me");
 }
index 7166e37..a4478f5 100644 (file)
@@ -24,6 +24,7 @@
 #include <limits.h>
 #include <malloc.h>
 #include <stdio.h>
+#include <string.h>
 
 #include "uv.h"
 #include "uv-common.h"
@@ -165,12 +166,7 @@ static LPFN_TRANSMITFILE            pTransmitFile6;
 #define UV_HANDLE_BIND_ERROR       0x1000
 #define UV_HANDLE_IPV6             0x2000
 #define UV_HANDLE_PIPESERVER       0x4000
-
-/*
- * Private uv_req flags.
- */
-/* The request is currently queued. */
-#define UV_REQ_PENDING             0x01
+#define UV_HANDLE_READ_PENDING     0x8000
 
 
 /* Binary tree used to keep the list of timers sorted. */
@@ -518,12 +514,9 @@ void uv_init() {
 }
 
 
-void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) {
+static void uv_req_init(uv_req_t* req) {
   uv_counters()->req_init++;
   req->type = UV_UNKNOWN_REQ;
-  req->flags = 0;
-  req->handle = handle;
-  req->cb = cb;
 }
 
 
@@ -602,7 +595,10 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) {
 static void uv_init_connection(uv_stream_t* handle) {
   handle->flags |= UV_HANDLE_CONNECTION;
   handle->write_reqs_pending = 0;
-  uv_req_init(&(handle->read_req), (uv_handle_t*)handle, NULL);
+
+  uv_req_init((uv_req_t*) &(handle->read_req));
+  handle->read_req.type = UV_READ;
+  handle->read_req.data = handle;
 }
 
 
@@ -650,11 +646,10 @@ static void uv_tcp_endgame(uv_tcp_t* handle) {
       err = uv_new_sys_error(WSAGetLastError());
     }
     if (handle->shutdown_req->cb) {
-      handle->shutdown_req->flags &= ~UV_REQ_PENDING;
       if (status == -1) {
         uv_last_error_ = err;
       }
-      ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status);
+      handle->shutdown_req->cb(handle->shutdown_req, status);
     }
     handle->reqs_pending--;
   }
@@ -683,11 +678,10 @@ static void uv_pipe_endgame(uv_pipe_t* handle) {
     close_pipe(handle, &status, &err);
 
     if (handle->shutdown_req->cb) {
-      handle->shutdown_req->flags &= ~UV_REQ_PENDING;
       if (status == -1) {
         uv_last_error_ = err;
       }
-      ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status);
+      handle->shutdown_req->cb(handle->shutdown_req, status);
     }
     handle->reqs_pending--;
   }
@@ -952,9 +946,6 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
 
   /* Prepare the uv_req structure. */
   req = &handle->accept_req;
-  assert(!(req->flags & UV_REQ_PENDING));
-  req->type = UV_ACCEPT;
-  req->flags |= UV_REQ_PENDING;
 
   /* choose family and extension function */
   if ((handle->flags & UV_HANDLE_IPV6) != 0) {
@@ -999,57 +990,55 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
   handle->accept_socket = accept_socket;
 
   handle->reqs_pending++;
-  req->flags |= UV_REQ_PENDING;
 }
 
 
-static void uv_pipe_queue_accept(uv_pipe_t* handle) {
-  uv_req_t* req;
+static void uv_pipe_queue_accept(uv_pipe_t* handle, struct uv_pipe_accept_s* req) {
   HANDLE pipeHandle;
-  int i;
 
   assert(handle->flags & UV_HANDLE_LISTENING);
+  assert(req->pipeHandle == INVALID_HANDLE_VALUE);
+
+  pipeHandle = CreateNamedPipe(handle->name,
+                               PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+                               PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+                               PIPE_UNLIMITED_INSTANCES,
+                               65536,
+                               65536,
+                               0,
+                               NULL);
+
+  if (pipeHandle == INVALID_HANDLE_VALUE) {
+    req->error = uv_new_sys_error(GetLastError());
+    uv_insert_pending_req((uv_req_t*) req);
+    handle->reqs_pending++;
+    return;
+  }
 
-  for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
-    req = &handle->accept_reqs[i];
-    if (!(req->flags & UV_REQ_PENDING)) {
-      pipeHandle = CreateNamedPipe(handle->name,
-                                   PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
-                                   PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
-                                   PIPE_UNLIMITED_INSTANCES,
-                                   65536,
-                                   65536,
-                                   0,
-                                   NULL);
-
-      if (pipeHandle == INVALID_HANDLE_VALUE) {
-        continue;
-      }
-
-      if (CreateIoCompletionPort(pipeHandle,
-                                 uv_iocp_,
-                                 (ULONG_PTR)handle,
-                                 0) == NULL) {
-        continue;
-      }
-
-      /* Prepare the overlapped structure. */
-      memset(&(req->overlapped), 0, sizeof(req->overlapped));
+  if (CreateIoCompletionPort(pipeHandle,
+                                uv_iocp_,
+                                (ULONG_PTR)handle,
+                                0) == NULL) {
+    req->error = uv_new_sys_error(GetLastError());
+    uv_insert_pending_req((uv_req_t*) req);
+    handle->reqs_pending++;
+    return;
+  }
 
-      if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && 
-          GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
-        /* Make this req pending reporting an error. */
-        req->error = uv_new_sys_error(GetLastError());
-        uv_insert_pending_req(req);
-        handle->reqs_pending++;
-        continue;
-      }
+  /* Prepare the overlapped structure. */
+  memset(&(req->overlapped), 0, sizeof(req->overlapped));
 
-      req->data = pipeHandle;
-      req->flags |= UV_REQ_PENDING;
-      handle->reqs_pending++;
-    }
+  if (!ConnectNamedPipe(pipeHandle, &req->overlapped) &&
+      GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
+    /* Make this req pending reporting an error. */
+    req->error = uv_new_sys_error(GetLastError());
+    uv_insert_pending_req((uv_req_t*) req);
+    handle->reqs_pending++;
+    return;
   }
+
+  req->pipeHandle = pipeHandle;
+  handle->reqs_pending++;
 }
 
 
@@ -1060,11 +1049,10 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) {
   DWORD bytes, flags;
 
   assert(handle->flags & UV_HANDLE_READING);
+  assert(!(handle->flags & UV_HANDLE_READ_PENDING));
 
   req = &handle->read_req;
-  assert(!(req->flags & UV_REQ_PENDING));
   memset(&req->overlapped, 0, sizeof(req->overlapped));
-  req->type = UV_READ;
 
   buf.base = (char*) &uv_zero_;
   buf.len = 0;
@@ -1085,7 +1073,7 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) {
     return;
   }
 
-  req->flags |= UV_REQ_PENDING;
+  handle->flags |= UV_HANDLE_READ_PENDING;
   handle->reqs_pending++;
 }
 
@@ -1095,16 +1083,15 @@ static void uv_pipe_queue_read(uv_pipe_t* handle) {
   int result;
 
   assert(handle->flags & UV_HANDLE_READING);
-  assert(handle->connection);
-  assert(handle->connection->handle != INVALID_HANDLE_VALUE);
+  assert(!(handle->flags & UV_HANDLE_READ_PENDING));
+
+  assert(handle->handle != INVALID_HANDLE_VALUE);
 
   req = &handle->read_req;
-  assert(!(req->flags & UV_REQ_PENDING));
   memset(&req->overlapped, 0, sizeof(req->overlapped));
-  req->type = UV_READ;
 
   /* Do 0-read */
-  result = ReadFile(handle->connection->handle,
+  result = ReadFile(handle->handle,
                     &uv_zero_,
                     0,
                     NULL,
@@ -1118,7 +1105,7 @@ static void uv_pipe_queue_read(uv_pipe_t* handle) {
     return;
   }
 
-  req->flags |= UV_REQ_PENDING;
+  handle->flags |= UV_HANDLE_READ_PENDING;
   handle->reqs_pending++;
 }
 
@@ -1138,6 +1125,10 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
     return -1;
   }
 
+  if (!(handle->flags & UV_HANDLE_BOUND) &&
+      uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
+    return -1;
+
   if (listen(handle->socket, backlog) == SOCKET_ERROR) {
     uv_set_sys_error(WSAGetLastError());
     return -1;
@@ -1146,7 +1137,9 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
   handle->flags |= UV_HANDLE_LISTENING;
   handle->connection_cb = cb;
 
-  uv_req_init(&(handle->accept_req), (uv_handle_t*)handle, NULL);
+  uv_req_init(&(handle->accept_req));
+  handle->accept_req.type = UV_ACCEPT;
+  handle->accept_req.data = handle;
   uv_tcp_queue_accept(handle);
 
   return 0;
@@ -1179,36 +1172,27 @@ static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
 
 
 static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) {
-  uv_pipe_instance_t* connection = server->connections;
-
   /* Find a connection instance that has been connected, but not yet accepted. */
-  while (connection) {
-    if (connection->state == UV_PIPEINSTANCE_CONNECTED) {
-      break;
-    }
+  struct uv_pipe_accept_s* req = server->pending_accepts;
 
-    connection = connection->next;
-  }
-
-  if (!connection) {
+  if (!req) {
     /* No valid connections found, so we error out. */
-    uv_set_sys_error(UV_ENOTCONN);
+    uv_set_sys_error(WSAEWOULDBLOCK);
     return -1;
   }
 
-  /* Make the connection instance active */
-  connection->state = UV_PIPEINSTANCE_ACTIVE;
-  
-  /* Assign the connection to the client. */
-  client->connection = connection;
-  client->server = server;
+  /* Initialize the client handle and copy the pipeHandle to the client */
+  uv_pipe_init(client);
+  uv_init_connection((uv_stream_t*) client);
+  client->handle = req->pipeHandle;
 
-  uv_init_connection((uv_stream_t*)client);
-  client->flags |= UV_HANDLE_PIPESERVER;
-  uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL);
+  /* Prepare the req to pick up a new connection */
+  server->pending_accepts = req->next_pending;
+  req->next_pending = NULL;
+  req->pipeHandle = INVALID_HANDLE_VALUE;
 
   if (!(server->flags & UV_HANDLE_CLOSING)) {
-    uv_pipe_queue_accept(server);
+    uv_pipe_queue_accept(server, req);
   }
 
   return 0;
@@ -1250,7 +1234,7 @@ static int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb
 
   /* If reading was stopped and then started again, there could stell be a */
   /* read request pending. */
-  if (!(handle->read_req.flags & UV_REQ_PENDING))
+  if (!(handle->flags & UV_HANDLE_READ_PENDING))
     uv_tcp_queue_read(handle);
 
   return 0;
@@ -1279,7 +1263,7 @@ static int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_c
 
   /* If reading was stopped and then started again, there could stell be a */
   /* read request pending. */
-  if (!(handle->read_req.flags & UV_REQ_PENDING))
+  if (!(handle->flags & UV_HANDLE_READ_PENDING))
     uv_pipe_queue_read(handle);
 
   return 0;
@@ -1304,20 +1288,18 @@ int uv_read_stop(uv_stream_t* handle) {
 }
 
 
-int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) {
+int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
+    struct sockaddr_in address, uv_connect_cb cb) {
   int addrsize = sizeof(struct sockaddr_in);
   BOOL success;
   DWORD bytes;
-  uv_tcp_t* handle = (uv_tcp_t*)req->handle;
-
-  assert(!(req->flags & UV_REQ_PENDING));
 
   if (handle->flags & UV_HANDLE_BIND_ERROR) {
     uv_last_error_ = handle->error;
     return -1;
   }
 
-  if (addr.sin_family != AF_INET) {
+  if (address.sin_family != AF_INET) {
     uv_set_sys_error(WSAEFAULT);
     return -1;
   }
@@ -1326,11 +1308,14 @@ int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) {
       uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
     return -1;
 
-  memset(&req->overlapped, 0, sizeof(req->overlapped));
+  uv_req_init((uv_req_t*) req);
   req->type = UV_CONNECT;
+  req->handle = (uv_stream_t*) handle;
+  req->cb = cb;
+  memset(&req->overlapped, 0, sizeof(req->overlapped));
 
   success = pConnectEx(handle->socket,
-                       (struct sockaddr*)&addr,
+                       (struct sockaddr*) &address,
                        addrsize,
                        NULL,
                        0,
@@ -1342,32 +1327,29 @@ int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) {
     return -1;
   }
 
-  req->flags |= UV_REQ_PENDING;
   handle->reqs_pending++;
 
   return 0;
 }
 
 
-int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) {
+int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
+    struct sockaddr_in6 address, uv_connect_cb cb) {
   int addrsize = sizeof(struct sockaddr_in6);
   BOOL success;
   DWORD bytes;
-  uv_tcp_t* handle = (uv_tcp_t*)req->handle;
 
   if (!uv_allow_ipv6) {
     uv_new_sys_error(UV_EAFNOSUPPORT);
     return -1;
   }
 
-  assert(!(req->flags & UV_REQ_PENDING));
-
   if (handle->flags & UV_HANDLE_BIND_ERROR) {
     uv_last_error_ = handle->error;
     return -1;
   }
 
-  if (addr.sin6_family != AF_INET6) {
+  if (address.sin6_family != AF_INET6) {
     uv_set_sys_error(WSAEFAULT);
     return -1;
   }
@@ -1376,11 +1358,14 @@ int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) {
       uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0)
     return -1;
 
-  memset(&req->overlapped, 0, sizeof(req->overlapped));
+  uv_req_init((uv_req_t*) req);
   req->type = UV_CONNECT;
+  req->handle = (uv_stream_t*) handle;
+  req->cb = cb;
+  memset(&req->overlapped, 0, sizeof(req->overlapped));
 
   success = pConnectEx6(handle->socket,
-                       (struct sockaddr*)&addr,
+                       (struct sockaddr*) &address,
                        addrsize,
                        NULL,
                        0,
@@ -1392,7 +1377,6 @@ int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) {
     return -1;
   }
 
-  req->flags |= UV_REQ_PENDING;
   handle->reqs_pending++;
 
   return 0;
@@ -1429,25 +1413,26 @@ static size_t uv_count_bufs(uv_buf_t bufs[], int count) {
 }
 
 
-int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
+static int uv_tcp_write(uv_write_t* req, uv_tcp_t* handle, uv_buf_t bufs[], int bufcnt,
+    uv_write_cb cb) {
   int result;
   DWORD bytes, err;
-  uv_tcp_t* handle = (uv_tcp_t*) req->handle;
 
-  assert(!(req->flags & UV_REQ_PENDING));
-
-  if (!(req->handle->flags & UV_HANDLE_CONNECTION)) {
+  if (!(handle->flags & UV_HANDLE_CONNECTION)) {
     uv_set_sys_error(WSAEINVAL);
     return -1;
   }
 
-  if (req->handle->flags & UV_HANDLE_SHUTTING) {
+  if (handle->flags & UV_HANDLE_SHUTTING) {
     uv_set_sys_error(WSAESHUTDOWN);
     return -1;
   }
 
-  memset(&req->overlapped, 0, sizeof(req->overlapped));
+  uv_req_init((uv_req_t*) req);
   req->type = UV_WRITE;
+  req->handle = (uv_stream_t*) handle;
+  req->cb = cb;
+  memset(&req->overlapped, 0, sizeof(req->overlapped));
 
   result = WSASend(handle->socket,
                    (WSABUF*)bufs,
@@ -1474,7 +1459,6 @@ int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
     handle->write_queue_size += req->queued_bytes;
   }
 
-  req->flags |= UV_REQ_PENDING;
   handle->reqs_pending++;
   handle->write_reqs_pending++;
 
@@ -1482,34 +1466,34 @@ int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
 }
 
 
-int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
+int uv_pipe_write(uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt,
+    uv_write_cb cb) {
   int result;
-  uv_pipe_t* handle = (uv_pipe_t*) req->handle;
-
-  assert(!(req->flags & UV_REQ_PENDING));
 
   if (bufcnt != 1) {
     uv_set_sys_error(UV_ENOTSUP);
     return -1;
   }
 
-  assert(handle->connection);
-  assert(handle->connection->handle != INVALID_HANDLE_VALUE);
+  assert(handle->handle != INVALID_HANDLE_VALUE);
 
-  if (!(req->handle->flags & UV_HANDLE_CONNECTION)) {
+  if (!(handle->flags & UV_HANDLE_CONNECTION)) {
     uv_set_sys_error(UV_EINVAL);
     return -1;
   }
 
-  if (req->handle->flags & UV_HANDLE_SHUTTING) {
+  if (handle->flags & UV_HANDLE_SHUTTING) {
     uv_set_sys_error(UV_EOF);
     return -1;
   }
 
-  memset(&req->overlapped, 0, sizeof(req->overlapped));
+  uv_req_init((uv_req_t*) req);
   req->type = UV_WRITE;
+  req->handle = (uv_stream_t*) handle;
+  req->cb = cb;
+  memset(&req->overlapped, 0, sizeof(req->overlapped));
 
-  result = WriteFile(handle->connection->handle,
+  result = WriteFile(handle->handle,
                      bufs[0].base,
                      bufs[0].len,
                      NULL,
@@ -1529,7 +1513,6 @@ int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
     handle->write_queue_size += req->queued_bytes;
   }
 
-  req->flags |= UV_REQ_PENDING;
   handle->reqs_pending++;
   handle->write_reqs_pending++;
 
@@ -1537,22 +1520,21 @@ int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
 }
 
 
-int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
-  if (req->handle->type == UV_TCP) {
-    return uv_tcp_write(req, bufs, bufcnt);
-  } else if (req->handle->type == UV_NAMED_PIPE) {
-    return uv_pipe_write(req, bufs, bufcnt);
+int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
+    uv_write_cb cb) {
+  if (handle->type == UV_TCP) {
+    return uv_tcp_write(req, (uv_tcp_t*) handle, bufs, bufcnt, cb);
+  } else if (handle->type == UV_NAMED_PIPE) {
+    return uv_pipe_write(req, (uv_pipe_t*) handle, bufs, bufcnt, cb);
   }
 
+  uv_set_sys_error(WSAEINVAL);
   return -1;
 }
 
 
-int uv_shutdown(uv_req_t* req) {
-  uv_tcp_t* handle = (uv_tcp_t*) req->handle;
-  int status = 0;
-
-  if (!(req->handle->flags & UV_HANDLE_CONNECTION)) {
+int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
+  if (!(handle->flags & UV_HANDLE_CONNECTION)) {
     uv_set_sys_error(WSAEINVAL);
     return -1;
   }
@@ -1562,8 +1544,10 @@ int uv_shutdown(uv_req_t* req) {
     return -1;
   }
 
+  uv_req_init((uv_req_t*) req);
   req->type = UV_SHUTDOWN;
-  req->flags |= UV_REQ_PENDING;
+  req->handle = handle;
+  req->cb = cb;
 
   handle->flags |= UV_HANDLE_SHUTTING;
   handle->shutdown_req = req;
@@ -1592,8 +1576,7 @@ static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) {
 
   assert(handle->type == UV_TCP);
 
-  /* Mark the request non-pending */
-  req->flags &= ~UV_REQ_PENDING;
+  handle->flags &= ~UV_HANDLE_READ_PENDING;
 
   if (req->error.code != UV_OK) {
     /* An error occurred doing the 0-read. */
@@ -1649,7 +1632,8 @@ static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) {
     }
 
     /* Post another 0-read if still reading and not closing. */
-    if (handle->flags & UV_HANDLE_READING) {
+    if ((handle->flags & UV_HANDLE_READING) &&
+        !(handle->flags & UV_HANDLE_READ_PENDING)) {
       uv_tcp_queue_read(handle);
     }
   }
@@ -1658,12 +1642,9 @@ static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) {
 }
 
 
-static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_req_t* req) {
+static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_write_t* req) {
   assert(handle->type == UV_TCP);
 
-  /* Mark the request non-pending */
-  req->flags &= ~UV_REQ_PENDING;
-
   handle->write_queue_size -= req->queued_bytes;
 
   if (req->cb) {
@@ -1684,9 +1665,6 @@ static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_req_t* req) {
 static void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
   assert(handle->type == UV_TCP);
 
-  /* Mark the request non-pending */
-  req->flags &= ~UV_REQ_PENDING;
-
   /* If handle->accepted_socket is not a valid socket, then */
   /* uv_queue_accept must have failed. This is a serious error. We stop */
   /* accepting connections and report this error to the connection */
@@ -1723,12 +1701,9 @@ static void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
 }
 
 
-static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_req_t* req) {
+static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_connect_t* req) {
   assert(handle->type == UV_TCP);
 
-  /* Mark the request non-pending */
-  req->flags &= ~UV_REQ_PENDING;
-
   if (req->cb) {
     if (req->error.code == UV_OK) {
       if (setsockopt(handle->socket,
@@ -1758,8 +1733,7 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) {
 
   assert(handle->type == UV_NAMED_PIPE);
 
-  /* Mark the request non-pending */
-  req->flags &= ~UV_REQ_PENDING;
+  handle->flags &= ~UV_HANDLE_READ_PENDING;
 
   if (req->error.code != UV_OK) {
     /* An error occurred doing the 0-read. */
@@ -1777,7 +1751,7 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) {
       * This is so that ReadFile doesn't block if the read buffer is empty.
       */
     mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT;
-    if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
+    if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) {
       /* We can't continue processing this read. */
       handle->flags &= ~UV_HANDLE_READING;
       uv_set_sys_error(GetLastError());
@@ -1791,11 +1765,11 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) {
       buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
       assert(buf.len > 0);
 
-      if (ReadFile(handle->connection->handle,
-                  buf.base,
-                  buf.len,
-                  &bytes,
-                  NULL)) {
+      if (ReadFile(handle->handle,
+                   buf.base,
+                   buf.len,
+                   &bytes,
+                   NULL)) {
         if (bytes > 0) {
           /* Successful read */
           handle->read_cb((uv_stream_t*)handle, bytes, buf);
@@ -1816,7 +1790,7 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) {
         err = GetLastError();
         if (err == ERROR_NO_DATA) {
           /* Read buffer was completely empty, report a 0-byte read. */
-          uv_set_sys_error(UV_EAGAIN);
+          uv_set_sys_error(WSAEWOULDBLOCK);
           handle->read_cb((uv_stream_t*)handle, 0, buf);
         } else {
           /* Ouch! serious error. */
@@ -1829,10 +1803,11 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) {
 
     /* TODO: if the read callback stops reading we can't start reading again
         because the pipe will still be in nowait mode. */
-    if (handle->flags & UV_HANDLE_READING) {
+    if ((handle->flags & UV_HANDLE_READING) &&
+        !(handle->flags & UV_HANDLE_READ_PENDING)) {
       /* Switch back to blocking mode so that we can use IOCP for 0-reads */
       mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
-      if (SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
+      if (SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) {
         /* Post another 0-read */
         uv_pipe_queue_read(handle);
       } else {
@@ -1851,12 +1826,9 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) {
 }
 
 
-static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) {
+static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_write_t* req) {
   assert(handle->type == UV_NAMED_PIPE);
 
-  /* Mark the request non-pending */
-  req->flags &= ~UV_REQ_PENDING;
-
   handle->write_queue_size -= req->queued_bytes;
 
   if (req->cb) {
@@ -1874,39 +1846,27 @@ static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) {
 }
 
 
-static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) {
-  uv_pipe_instance_t* pipeInstance;
+static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* raw_req) {
+  struct uv_pipe_accept_s* req = (struct uv_pipe_accept_s*) raw_req;
 
   assert(handle->type == UV_NAMED_PIPE);
 
-  /* Mark the request non-pending */
-  req->flags &= ~UV_REQ_PENDING;
-
   if (req->error.code == UV_OK) {
-    assert(req->data);
-
-    /* Create the connection instance and add it to the connections list. */
-    pipeInstance = (uv_pipe_instance_t*)malloc(sizeof(uv_pipe_instance_t));
-    if (!pipeInstance) {
-      uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
-    }
-
-    pipeInstance->handle = req->data;
-    pipeInstance->state = UV_PIPEINSTANCE_CONNECTED;
-    pipeInstance->next = handle->connections;
-    handle->connections = pipeInstance;
+    assert(req->pipeHandle != INVALID_HANDLE_VALUE);
 
-    /* Clear the request. */
-    req->data = NULL;
-    req->flags = 0;
+    req->next_pending = handle->pending_accepts;
+    handle->pending_accepts = req;
 
     if (handle->connection_cb) {
       handle->connection_cb((uv_handle_t*)handle, 0);
     }
   } else {
-    /* Ignore errors and continue listening */
-    if (handle->flags & UV_HANDLE_LISTENING) {
-      uv_pipe_queue_accept(handle);
+    if (req->pipeHandle != INVALID_HANDLE_VALUE) {
+      CloseHandle(req->pipeHandle);
+      req->pipeHandle = INVALID_HANDLE_VALUE;
+    }
+    if (!(handle->flags & UV_HANDLE_CLOSING)) {
+      uv_pipe_queue_accept(handle, req);
     }
   }
 
@@ -1914,12 +1874,9 @@ static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) {
 }
 
 
-static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_req_t* req) {
+static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_connect_t* req) {
   assert(handle->type == UV_NAMED_PIPE);
 
-  /* Mark the request non-pending */
-  req->flags &= ~UV_REQ_PENDING;
-
   if (req->cb) {
     if (req->error.code == UV_OK) {
       uv_init_connection((uv_stream_t*)handle);
@@ -2172,10 +2129,12 @@ int uv_async_init(uv_async_t* handle, uv_async_cb async_cb) {
   handle->flags = 0;
   handle->async_sent = 0;
   handle->error = uv_ok_;
+  handle->async_cb = async_cb;
 
   req = &handle->async_req;
-  uv_req_init(req, (uv_handle_t*)handle, async_cb);
+  uv_req_init(req);
   req->type = UV_WAKEUP;
+  req->data = handle;
 
   uv_refs_++;
 
@@ -2211,8 +2170,8 @@ static void uv_process_async_wakeup_req(uv_async_t* handle, uv_req_t* req) {
   assert(req->type == UV_WAKEUP);
 
   handle->async_sent = 0;
-  if (req->cb) {
-    ((uv_async_cb)req->cb)((uv_async_t*) handle, 0);
+  if (handle->async_cb) {
+    handle->async_cb((uv_async_t*) handle, 0);
   }
   if (handle->flags & UV_HANDLE_CLOSING) {
     uv_want_endgame((uv_handle_t*)handle);
@@ -2220,15 +2179,15 @@ static void uv_process_async_wakeup_req(uv_async_t* handle, uv_req_t* req) {
 }
 
 
-#define DELEGATE_STREAM_REQ(req, method)                                      \
+#define DELEGATE_STREAM_REQ(req, method, handle_at)                           \
   do {                                                                        \
-    switch (req->handle->type) {                                              \
+    switch (((uv_handle_t*) (req)->handle_at)->type) {                        \
       case UV_TCP:                                                            \
-        uv_process_tcp_##method##_req((uv_tcp_t*) req->handle, req);          \
+        uv_process_tcp_##method##_req((uv_tcp_t*) ((req)->handle_at), req);   \
         break;                                                                \
                                                                               \
       case UV_NAMED_PIPE:                                                     \
-        uv_process_pipe_##method##_req((uv_pipe_t*) req->handle, req);        \
+        uv_process_pipe_##method##_req((uv_pipe_t*) ((req)->handle_at), req); \
         break;                                                                \
                                                                               \
       default:                                                                \
@@ -2243,35 +2202,35 @@ static void uv_process_reqs() {
   while (req = uv_remove_pending_req()) {
     switch (req->type) {
       case UV_READ:
-        DELEGATE_STREAM_REQ(req, read);
+        DELEGATE_STREAM_REQ(req, read, data);
         break;
 
       case UV_WRITE:
-        DELEGATE_STREAM_REQ(req, write);
+        DELEGATE_STREAM_REQ((uv_write_t*) req, write, handle);
         break;
 
       case UV_ACCEPT:
-        DELEGATE_STREAM_REQ(req, accept);
+        DELEGATE_STREAM_REQ(req, accept, data);
         break;
 
       case UV_CONNECT:
-        DELEGATE_STREAM_REQ(req, connect);
+        DELEGATE_STREAM_REQ((uv_connect_t*) req, connect, handle);
         break;
 
       case UV_WAKEUP:
-        uv_process_async_wakeup_req((uv_async_t*) req->handle, req);
+        uv_process_async_wakeup_req((uv_async_t*) req->data, req);
         break;
 
       case UV_ARES_EVENT_REQ:
-        uv_process_ares_event_req((uv_ares_action_t*) req->handle, req);
+        uv_process_ares_event_req((uv_ares_action_t*) req->data, req);
         break;
 
       case UV_ARES_CLEANUP_REQ:
-        uv_process_ares_cleanup_req((uv_ares_task_t*) req->handle, req);
+        uv_process_ares_cleanup_req((uv_ares_task_t*) req->data, req);
         break;
 
       case UV_GETADDRINFO_REQ:
-        uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->handle, req);
+        uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->data, req);
         break;
 
       default:
@@ -2505,8 +2464,9 @@ VOID CALLBACK uv_ares_socksignal_tp(void* parameter, BOOLEAN timerfired) {
     selhandle->write = (network_events.lNetworkEvents & (FD_WRITE | FD_CONNECT)) ? 1 : 0;
 
     uv_ares_req = &selhandle->ares_req;
-    uv_req_init(uv_ares_req, (uv_handle_t*)selhandle, NULL);
+    uv_req_init(uv_ares_req);
     uv_ares_req->type = UV_ARES_EVENT_REQ;
+    uv_ares_req->data = selhandle;
 
     /* post ares needs to called */
     if (!PostQueuedCompletionStatus(uv_iocp_,
@@ -2551,8 +2511,9 @@ void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) {
 
       /* Post request to cleanup the Task */
       uv_ares_req = &uv_handle_ares->ares_req;
-      uv_req_init(uv_ares_req, (uv_handle_t*)uv_handle_ares, NULL);
+      uv_req_init(uv_ares_req);
       uv_ares_req->type = UV_ARES_CLEANUP_REQ;
+      uv_ares_req->data = uv_handle_ares;
 
       /* post ares done with socket - finish cleanup when all threads done. */
       if (!PostQueuedCompletionStatus(uv_iocp_,
@@ -2981,7 +2942,8 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle,
   }
 
   /* init request for Post handling */
-  uv_req_init(&handle->getadddrinfo_req, (uv_handle_t*)handle, NULL);
+  uv_req_init(&handle->getadddrinfo_req);
+  handle->getadddrinfo_req.data = handle;
   handle->getadddrinfo_req.type = UV_GETADDRINFO_REQ;
 
   /* Ask thread to run. Treat this as a long operation */
@@ -3007,6 +2969,7 @@ int uv_pipe_init(uv_pipe_t* handle) {
 
   handle->type = UV_NAMED_PIPE;
   handle->reqs_pending = 0;
+  handle->pending_accepts = NULL;
 
   uv_counters()->pipe_init++;
 
@@ -3018,30 +2981,27 @@ int uv_pipe_init(uv_pipe_t* handle) {
 /* TODO: make this work with UTF8 name */
 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
   int i;
+  struct uv_pipe_accept_s* req;
 
   if (!name) {
+    uv_set_sys_error(WSAEINVAL);
     return -1;
   }
 
-  handle->connections = NULL;
-
-  /* Initialize accept requests. */
-  for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
-    handle->accept_reqs[i].flags = 0;
-    handle->accept_reqs[i].type = UV_ACCEPT;
-    handle->accept_reqs[i].handle = (uv_handle_t*)handle;
-    handle->accept_reqs[i].cb = NULL;
-    handle->accept_reqs[i].data = NULL;
-    uv_counters()->req_init++;
-  }
-
   /* Make our own copy of the pipe name */
-  handle->name = (char*)malloc(MAX_PIPENAME_LEN);
+  handle->name = strdup(name);
   if (!handle->name) {
     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
   }
-  strcpy(handle->name, name);
-  handle->name[255] = '\0';
+
+  for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
+    req = &handle->accept_reqs[i];
+    uv_req_init((uv_req_t*) req);
+    req->type = UV_ACCEPT;
+    req->data = handle;
+    req->pipeHandle = INVALID_HANDLE_VALUE;
+    req->next_pending = NULL;
+  }
 
   handle->flags |= UV_HANDLE_PIPESERVER;
   return 0;
@@ -3050,9 +3010,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
 
 /* Starts listening for connections for the given pipe. */
 int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
-  int i, maxInstances, errno;
-  HANDLE pipeHandle;
-  uv_pipe_instance_t* pipeInstance;
+  int i, errno;
 
   if (handle->flags & UV_HANDLE_LISTENING ||
       handle->flags & UV_HANDLE_READING) {
@@ -3068,32 +3026,36 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
   handle->flags |= UV_HANDLE_LISTENING;
   handle->connection_cb = cb;
 
-  uv_pipe_queue_accept(handle);
+  for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
+    uv_pipe_queue_accept(handle, &handle->accept_reqs[i]);
+  }
+
   return 0;
 }
 
 /* TODO: make this work with UTF8 name */
-int uv_pipe_connect(uv_req_t* req, const char* name) {
+/* TODO: run this in the thread pool */
+int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
+    const char* name, uv_connect_cb cb) {
   int errno;
   DWORD mode;
-  uv_pipe_t* handle = (uv_pipe_t*)req->handle;
-
-  assert(!(req->flags & UV_REQ_PENDING));
 
+  uv_req_init((uv_req_t*) req);
   req->type = UV_CONNECT;
-  handle->connection = &handle->clientConnection;
-  handle->server = NULL;
+  req->handle = (uv_stream_t*) handle;
+  req->cb = cb;
+
   memset(&req->overlapped, 0, sizeof(req->overlapped));
 
-  handle->clientConnection.handle = CreateFile(name,
-                                               GENERIC_READ | GENERIC_WRITE,
-                                               0,
-                                               NULL,
-                                               OPEN_EXISTING,
-                                               FILE_FLAG_OVERLAPPED,
-                                               NULL);
+  handle->handle = CreateFile(name,
+                              GENERIC_READ | GENERIC_WRITE,
+                              0,
+                              NULL,
+                              OPEN_EXISTING,
+                              FILE_FLAG_OVERLAPPED,
+                              NULL);
 
-  if (handle->clientConnection.handle == INVALID_HANDLE_VALUE &&
+  if (handle->handle == INVALID_HANDLE_VALUE &&
       GetLastError() != ERROR_IO_PENDING) {
     errno = GetLastError();
     goto error;
@@ -3101,12 +3063,12 @@ int uv_pipe_connect(uv_req_t* req, const char* name) {
 
   mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
 
-  if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) {
+  if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) {
     errno = GetLastError();
     goto error;
   }
 
-  if (CreateIoCompletionPort(handle->clientConnection.handle,
+  if (CreateIoCompletionPort(handle->handle,
                              uv_iocp_,
                              (ULONG_PTR)handle,
                              0) == NULL) {
@@ -3115,124 +3077,36 @@ int uv_pipe_connect(uv_req_t* req, const char* name) {
   }
 
   req->error = uv_ok_;
-  req->flags |= UV_REQ_PENDING;
-  handle->connection->state = UV_PIPEINSTANCE_ACTIVE;
-  uv_insert_pending_req(req);
+  uv_insert_pending_req((uv_req_t*) req);
   handle->reqs_pending++;
   return 0;
 
 error:
-  close_pipe(handle, NULL, NULL);
+  if (handle->handle != INVALID_HANDLE_VALUE) {
+    CloseHandle(handle->handle);
+  }
   req->error = uv_new_sys_error(errno);
-  uv_insert_pending_req(req);
+  uv_insert_pending_req((uv_req_t*) req);
   handle->reqs_pending++;
-  return 0;
+  return -1;
 }
 
 
 /* Cleans up uv_pipe_t (server or connection) and all resources associated with it */
 static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
-  uv_pipe_instance_t* connection, *next, *cur, **prev;
-  HANDLE pipeHandle;
   int i;
+  HANDLE pipeHandle;
 
   if (handle->flags & UV_HANDLE_PIPESERVER) {
-    if (handle->flags & UV_HANDLE_CONNECTION) {
-      /*
-       * The handle is for a connection instance on the pipe server.
-       * To clean-up, we call DisconnectNamedPipe, and then uv_pipe_queue_accept will cleanup the allocated uv_pipe_instance_t.
-       */
-
-      connection = handle->connection;
-      if (connection && connection->handle != INVALID_HANDLE_VALUE) {
-        /* Disconnect the connection intance and return it to pending state. */
-        if (DisconnectNamedPipe(connection->handle)) {
-          if (status) *status = 0;
-        } else {
-          if (status) *status = -1;
-          if (err) *err = uv_new_sys_error(GetLastError());
-        }
-
-        connection->state = UV_PIPEINSTANCE_DISCONNECTED;
-        connection->handle = NULL;
-
-        cur = handle->connections;
-        handle->connection = NULL;
-        prev = &handle->server->connections;        
-
-        /* Remove the connection from the list. */
-        while (connection) {
-          if (cur == connection) {
-            *prev = connection->next;
-            free(connection);
-            break;
-          } else {
-            prev = &connection->next;
-            connection = connection->next;
-          }
-        }
-
-        /* Queue accept now that the instance is in pending state. */
-        if (!(handle->server->flags & UV_HANDLE_CLOSING)) {
-          uv_pipe_queue_accept(handle->server);
-        }
-      }
-    } else {
-      /*
-       * The handle is for the pipe server.
-       * To clean-up we close every active and pending connection instance.
-       */
-
-      if (handle->name) {
-        free(handle->name);
-        handle->name = NULL;
+    for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
+      pipeHandle = handle->accept_reqs[i].pipeHandle;
+      if (pipeHandle != INVALID_HANDLE_VALUE) {
+        CloseHandle(pipeHandle);
       }
-
-      connection = handle->connections;
-      while (connection) {
-        pipeHandle = connection->handle;
-
-        if (pipeHandle) {
-          DisconnectNamedPipe(pipeHandle);
-          CloseHandle(pipeHandle);
-        }
-
-        next = connection->next;
-        free(connection);
-        connection = next;
-      }
-
-      handle->connections = NULL;
-
-      for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
-        if (handle->accept_reqs[i].flags & UV_REQ_PENDING) {
-          pipeHandle = handle->accept_reqs[i].data;
-          assert(pipeHandle);
-          DisconnectNamedPipe(pipeHandle);
-          CloseHandle(pipeHandle);
-          handle->accept_reqs[i].flags = 0;
-          handle->reqs_pending--;
-        }
-      }
-
-      if (status) *status = 0;
     }
+
   } else {
-    /*
-     * The handle is for a connection instance on the pipe client.
-     * To clean-up we close the pipe handle.
-     */
-    connection = handle->connection;
-    if (connection && connection->handle != INVALID_HANDLE_VALUE) {
-      if (CloseHandle(connection->handle)) {
-        connection->state = UV_PIPEINSTANCE_DISCONNECTED;
-        handle->connection = NULL;
-        if (status) *status = 0;
-      } else {
-        if (status) *status = -1;
-        if (err) *err = uv_new_sys_error(GetLastError());
-      }
-    }
+    CloseHandle(handle->handle);
   }
 
   handle->flags |= UV_HANDLE_SHUT;
index 7124a36..721e8c8 100644 (file)
@@ -34,8 +34,8 @@ typedef struct {
   int pongs;
   int state;
   uv_tcp_t tcp;
-  uv_req_t connect_req;
-  uv_req_t shutdown_req;
+  uv_connect_t connect_req;
+  uv_shutdown_t shutdown_req;
 } pinger_t;
 
 typedef struct buf_s {
@@ -90,7 +90,7 @@ static void pinger_close_cb(uv_handle_t* handle) {
 }
 
 
-static void pinger_write_cb(uv_req_t *req, int status) {
+static void pinger_write_cb(uv_write_t* req, int status) {
   ASSERT(status == 0);
 
   free(req);
@@ -98,22 +98,20 @@ static void pinger_write_cb(uv_req_t *req, int status) {
 
 
 static void pinger_write_ping(pinger_t* pinger) {
-  uv_req_t *req;
+  uv_write_t* req;
   uv_buf_t buf;
 
   buf.base = (char*)&PING;
   buf.len = strlen(PING);
 
-  req = (uv_req_t*)malloc(sizeof(*req));
-  uv_req_init(req, (uv_handle_t*)(&pinger->tcp), pinger_write_cb);
-
-  if (uv_write(req, &buf, 1)) {
+  req = malloc(sizeof *req);
+  if (uv_write(req, (uv_stream_t*) &pinger->tcp, &buf, 1, pinger_write_cb)) {
     FATAL("uv_write failed");
   }
 }
 
 
-static void pinger_shutdown_cb(uv_handle_t* handle, int status) {
+static void pinger_shutdown_cb(uv_shutdown_t* req, int status) {
   ASSERT(status == 0);
   pinger_shutdown_cb_called++;
 
@@ -151,8 +149,7 @@ static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
     if (pinger->state == 0) {
       pinger->pongs++;
       if (uv_now() - start_time > TIME) {
-        uv_req_init(&pinger->shutdown_req, (uv_handle_t*)tcp, pinger_shutdown_cb);
-        uv_shutdown(&pinger->shutdown_req);
+        uv_shutdown(&pinger->shutdown_req, (uv_stream_t*) tcp, pinger_shutdown_cb);
         break;
       } else {
         pinger_write_ping(pinger);
@@ -164,14 +161,14 @@ static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
 }
 
 
-static void pinger_connect_cb(uv_req_t *req, int status) {
+static void pinger_connect_cb(uv_connect_t* req, int status) {
   pinger_t *pinger = (pinger_t*)req->handle->data;
 
   ASSERT(status == 0);
 
   pinger_write_ping(pinger);
 
-  if (uv_read_start((uv_stream_t*)(req->handle), buf_alloc, pinger_read_cb)) {
+  if (uv_read_start(req->handle, buf_alloc, pinger_read_cb)) {
     FATAL("uv_read_start failed");
   }
 }
@@ -193,13 +190,9 @@ static void pinger_new() {
 
   pinger->tcp.data = pinger;
 
-  /* We are never doing multiple reads/connects at a time anyway. */
-  /* so these handles can be pre-initialized. */
-  uv_req_init(&pinger->connect_req, (uv_handle_t*)&pinger->tcp,
-      pinger_connect_cb);
-
   uv_tcp_bind(&pinger->tcp, client_addr);
-  r = uv_tcp_connect(&pinger->connect_req, server_addr);
+
+  r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, pinger_connect_cb);
   ASSERT(!r);
 }
 
index ad3676d..d7524f7 100644 (file)
@@ -183,22 +183,22 @@ static void read_cb(uv_stream_t* stream, ssize_t bytes, uv_buf_t buf) {
 }
 
 
-static void write_cb(uv_req_t *req, int status) {
+static void write_cb(uv_write_t* req, int status) {
   uv_buf_t* buf = (uv_buf_t*) req->data;
 
   ASSERT(status == 0);
 
-  req_free(req);
+  req_free((uv_req_t*) req);
 
   nsent += sizeof write_buffer;
   nsent_total += sizeof write_buffer;
 
-  do_write((uv_stream_t*)req->handle);
+  do_write((uv_stream_t*) req->handle);
 }
 
 
 static void do_write(uv_stream_t* stream) {
-  uv_req_t* req;
+  uv_write_t* req;
   uv_buf_t buf;
   int r;
 
@@ -206,23 +206,21 @@ static void do_write(uv_stream_t* stream) {
   buf.len = sizeof write_buffer;
 
   while (stream->write_queue_size == 0) {
-    req = req_alloc();
-    uv_req_init(req, (uv_handle_t*)stream, write_cb);
-
-    r = uv_write(req, &buf, 1);
+    req = (uv_write_t*) req_alloc();
+    r = uv_write(req, stream, &buf, 1, write_cb);
     ASSERT(r == 0);
   }
 }
 
 
-static void connect_cb(uv_req_t* req, int status) {
+static void connect_cb(uv_connect_t* req, int status) {
   int i;
 
   if (status) LOG(uv_strerror(uv_last_error()));
   ASSERT(status == 0);
 
   write_sockets++;
-  req_free(req);
+  req_free((uv_req_t*) req);
 
   maybe_connect_some();
 
@@ -238,7 +236,7 @@ static void connect_cb(uv_req_t* req, int status) {
 
 
 static void maybe_connect_some() {
-  uv_req_t* req;
+  uv_connect_t* req;
   uv_tcp_t* tcp;
   uv_pipe_t* pipe;
   int r;
@@ -251,9 +249,8 @@ static void maybe_connect_some() {
       r = uv_tcp_init(tcp);
       ASSERT(r == 0);
 
-      req = req_alloc();
-      uv_req_init(req, (uv_handle_t*)tcp, connect_cb);
-      r = uv_tcp_connect(req, connect_addr);
+      req = (uv_connect_t*) req_alloc();
+      r = uv_tcp_connect(req, tcp, connect_addr, connect_cb);
       ASSERT(r == 0);
     } else {
       pipe = &pipe_write_handles[max_connect_socket++];
@@ -261,9 +258,8 @@ static void maybe_connect_some() {
       r = uv_pipe_init(pipe);
       ASSERT(r == 0);
 
-      req = req_alloc();
-      uv_req_init(req, (uv_handle_t*)pipe, connect_cb);
-      r = uv_pipe_connect(req, TEST_PIPENAME);
+      req = (uv_connect_t*) req_alloc();
+      r = uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
       ASSERT(r == 0);
 
 #ifdef _WIN32
@@ -308,7 +304,7 @@ static void connection_cb(uv_handle_t* s, int status) {
  */
 
 typedef struct req_list_s {
-  uv_req_t uv_req;
+  union uv_any_req uv_req;
   struct req_list_s* next;
 } req_list_t;
 
index 53076e7..1c6b78f 100644 (file)
@@ -27,7 +27,7 @@
 
 
 typedef struct {
-  uv_req_t req;
+  uv_write_t req;
   uv_buf_t buf;
 } write_req_t;
 
@@ -51,7 +51,7 @@ static int server_closed;
 static uv_tcp_t server;
 
 
-static void after_write(uv_req_t* req, int status);
+static void after_write(uv_write_t* req, int status);
 static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
 static void on_close(uv_handle_t* peer);
 static void on_server_close(uv_handle_t* handle);
@@ -67,7 +67,7 @@ unsigned char qrecord[] = {5, 'e', 'c', 'h', 'o', 's', 3, 's', 'r', 'v', 0, 0, 1
 unsigned char arecord[] = {0xc0, 0x0c, 0, 1, 0, 1, 0, 0, 5, 0xbd, 0, 4, 10, 0, 1, 1 };
 
 
-static void after_write(uv_req_t* req, int status) {
+static void after_write(uv_write_t* req, int status) {
   write_req_t* wr;
 
   if (status) {
@@ -84,8 +84,8 @@ static void after_write(uv_req_t* req, int status) {
 }
 
 
-static void after_shutdown(uv_req_t* req, int status) {
-  uv_close(req->handle, on_close);
+static void after_shutdown(uv_shutdown_t* req, int status) {
+  uv_close((uv_handle_t*) req->handle, on_close);
   free(req);
 }
 
@@ -116,7 +116,7 @@ static void addrsp(write_req_t* wr, char* hdr) {
 }
 
 static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
-  write_req_t *wr;
+  write_req_twr;
   dnshandle* dns = (dnshandle*)handle;
   char hdrbuf[DNSREC_LEN];
   int hdrbuf_remaining = DNSREC_LEN;
@@ -127,7 +127,6 @@ static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
   int usingprev = 0;
 
   wr = (write_req_t*) malloc(sizeof *wr);
-  uv_req_init(&wr->req, (uv_handle_t*)handle, after_write);
   wr->buf.base = (char*)malloc(WRITE_BUF_LEN);
   wr->buf.len = 0;
 
@@ -197,7 +196,7 @@ static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
 
   /* send write buffer */
   if (wr->buf.len > 0) {
-    if (uv_write(&wr->req, &wr->buf, 1)) {
+    if (uv_write((uv_write_t*) &wr->req, handle, &wr->buf, 1, after_write)) {
       FATAL("uv_write failed");
     }
   }
@@ -217,7 +216,7 @@ static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
 }
 
 static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
-  uv_req_t* req;
+  uv_shutdown_t* req;
 
   if (nread < 0) {
     /* Error or EOF */
@@ -227,9 +226,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
       free(buf.base);
     }
 
-    req = (uv_req_t*) malloc(sizeof *req);
-    uv_req_init(req, (uv_handle_t*)handle, after_shutdown);
-    uv_shutdown(req);
+    req = malloc(sizeof *req);
+    uv_shutdown(req, handle, after_shutdown);
 
     return;
   }
index 4dc0e20..e107dc5 100644 (file)
@@ -25,7 +25,7 @@
 #include <stdlib.h>
 
 typedef struct {
-  uv_req_t req;
+  uv_write_t req;
   uv_buf_t buf;
 } write_req_t;
 
@@ -35,14 +35,14 @@ static uv_tcp_t tcpServer;
 static uv_pipe_t pipeServer;
 static uv_handle_t* server;
 
-static void after_write(uv_req_t* req, int status);
+static void after_write(uv_write_t* req, int status);
 static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
 static void on_close(uv_handle_t* peer);
 static void on_server_close(uv_handle_t* handle);
 static void on_connection(uv_handle_t*, int status);
 
 
-static void after_write(uv_req_t* req, int status) {
+static void after_write(uv_write_t* req, int status) {
   write_req_t* wr;
 
   if (status) {
@@ -59,8 +59,8 @@ static void after_write(uv_req_t* req, int status) {
 }
 
 
-static void after_shutdown(uv_req_t* req, int status) {
-  uv_close(req->handle, on_close);
+static void after_shutdown(uv_shutdown_t* req, int status) {
+  uv_close((uv_handle_t*)req->handle, on_close);
   free(req);
 }
 
@@ -68,7 +68,7 @@ static void after_shutdown(uv_req_t* req, int status) {
 static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
   int i;
   write_req_t *wr;
-  uv_req_t* req;
+  uv_shutdown_t* req;
 
   if (nread < 0) {
     /* Error or EOF */
@@ -78,9 +78,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
       free(buf.base);
     }
 
-    req = (uv_req_t*) malloc(sizeof *req);
-    uv_req_init(req, (uv_handle_t*)handle, (void *(*)(void *))after_shutdown);
-    uv_shutdown(req);
+    req = (uv_shutdown_t*) malloc(sizeof *req);
+    uv_shutdown(req, handle, after_shutdown);
 
     return;
   }
@@ -103,10 +102,9 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
 
   wr = (write_req_t*) malloc(sizeof *wr);
 
-  uv_req_init(&wr->req, (uv_handle_t*)handle, (void *(*)(void *))after_write);
   wr->buf.base = buf.base;
   wr->buf.len = nread;
-  if (uv_write(&wr->req, &wr->buf, 1)) {
+  if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) {
     FATAL("uv_write failed");
   }
 }
index 5b12c8b..4162b22 100644 (file)
@@ -32,7 +32,9 @@ static const char MESSAGE[] = "Failure is for the weak. Everyone dies alone.";
 
 static uv_tcp_t client;
 static uv_timer_t timer;
-static uv_req_t connect_req, write_req, shutdown_req;
+static uv_connect_t connect_req;
+static uv_write_t write_req;
+static uv_shutdown_t shutdown_req;
 
 static int nested = 0;
 static int close_cb_called = 0;
@@ -59,7 +61,7 @@ static void close_cb(uv_handle_t* handle) {
 }
 
 
-static void shutdown_cb(uv_req_t* req, int status) {
+static void shutdown_cb(uv_shutdown_t* req, int status) {
   ASSERT(status == 0);
   ASSERT(nested == 0 && "shutdown_cb must be called from a fresh stack");
 
@@ -97,11 +99,10 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
   /* from a fresh stack. */
   if (bytes_received == sizeof MESSAGE) {
     nested++;
-    uv_req_init(&shutdown_req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb);
 
     puts("Shutdown");
 
-    if (uv_shutdown(&shutdown_req)) {
+    if (uv_shutdown(&shutdown_req, (uv_stream_t*)tcp, shutdown_cb)) {
       FATAL("uv_shutdown failed");
     }
     nested--;
@@ -131,7 +132,7 @@ static void timer_cb(uv_timer_t* handle, int status) {
 }
 
 
-static void write_cb(uv_req_t* req, int status) {
+static void write_cb(uv_write_t* req, int status) {
   int r;
 
   ASSERT(status == 0);
@@ -154,7 +155,7 @@ static void write_cb(uv_req_t* req, int status) {
 }
 
 
-static void connect_cb(uv_req_t* req, int status) {
+static void connect_cb(uv_connect_t* req, int status) {
   uv_buf_t buf;
 
   puts("Connected. Write some data to echo server...");
@@ -167,9 +168,7 @@ static void connect_cb(uv_req_t* req, int status) {
   buf.base = (char*) &MESSAGE;
   buf.len = sizeof MESSAGE;
 
-  uv_req_init(&write_req, req->handle, (void *(*)(void *))write_cb);
-
-  if (uv_write(&write_req, &buf, 1)) {
+  if (uv_write(&write_req, (uv_stream_t*)req->handle, &buf, 1, write_cb)) {
     FATAL("uv_write failed");
   }
 
@@ -191,10 +190,8 @@ TEST_IMPL(callback_stack) {
   puts("Connecting...");
 
   nested++;
-  uv_req_init(&connect_req, (uv_handle_t*)&client,
-      (void *(*)(void *))connect_cb);
 
-  if (uv_tcp_connect(&connect_req, addr)) {
+  if (uv_tcp_connect(&connect_req, &client, addr, connect_cb)) {
     FATAL("uv_tcp_connect failed");
   }
   nested--;
index 9fc3f0b..1c2d212 100644 (file)
@@ -27,7 +27,7 @@
 
 
 static uv_tcp_t tcp;
-static uv_req_t req;
+static uv_connect_t req;
 static int connect_cb_calls;
 static int close_cb_calls;
 
@@ -66,18 +66,18 @@ static void timer_cb(uv_timer_t* handle, int status) {
 }
 
 
-static void on_connect_with_close(uv_req_t *req, int status) {
-  ASSERT(&tcp == (uv_tcp_t*) req->handle);
+static void on_connect_with_close(uv_connect_t *req, int status) {
+  ASSERT((uv_stream_t*) &tcp == req->handle);
   ASSERT(status == -1);
   ASSERT(uv_last_error().code == UV_ECONNREFUSED);
   connect_cb_calls++;
 
   ASSERT(close_cb_calls == 0);
-  uv_close(req->handle, on_close);
+  uv_close((uv_handle_t*)req->handle, on_close);
 }
 
 
-static void on_connect_without_close(uv_req_t *req, int status) {
+static void on_connect_without_close(uv_connect_t *req, int status) {
   ASSERT(status == -1);
   ASSERT(uv_last_error().code == UV_ECONNREFUSED);
   connect_cb_calls++;
@@ -103,10 +103,8 @@ void connection_fail(uv_connect_cb connect_cb) {
 
   /* We are never doing multiple reads/connects at a time anyway. */
   /* so these handles can be pre-initialized. */
-  uv_req_init(&req, (uv_handle_t*)&tcp, (void *(*)(void *))connect_cb);
-
   uv_tcp_bind(&tcp, client_addr);
-  r = uv_tcp_connect(&req, server_addr);
+  r = uv_tcp_connect(&req, &tcp, server_addr, connect_cb);
   ASSERT(!r);
 
   uv_run();
index 6f56518..30b63b9 100644 (file)
@@ -147,7 +147,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
 }
 
 
-static void connect_cb(uv_req_t* req, int status) {
+static void connect_cb(uv_connect_t* req, int status) {
   int r;
 
   ASSERT(req != NULL);
@@ -167,7 +167,7 @@ static void connect_cb(uv_req_t* req, int status) {
 static void client_connect() {
   struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
   uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client);
-  uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req);
+  uv_connect_t* connect_req = malloc(sizeof *connect_req);
   int r;
 
   ASSERT(client != NULL);
@@ -176,8 +176,7 @@ static void client_connect() {
   r = uv_tcp_init(client);
   ASSERT(r == 0);
 
-  uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb);
-  r = uv_tcp_connect(connect_req, addr);
+  r = uv_tcp_connect(connect_req, client, addr, connect_cb);
   ASSERT(r == 0);
 }
 
index 9d265d3..f533806 100644 (file)
@@ -29,7 +29,7 @@ static int getsocknamecount = 0;
 
 
 static uv_tcp_t tcp;
-static uv_req_t connect_req;
+static uv_connect_t connect_req;
 static uv_tcp_t tcpServer;
 
 
@@ -47,22 +47,23 @@ static void on_close(uv_handle_t* peer) {
 }
 
 
-static void after_shutdown(uv_req_t* req, int status) {
-  uv_close(req->handle, on_close);
+static void after_shutdown(uv_shutdown_t* req, int status) {
+  uv_close((uv_handle_t*) req->handle, on_close);
   free(req);
 }
 
 
 static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
-  uv_req_t* req;
+  uv_shutdown_t* req;
+  int r;
 
   if (buf.base) {
     free(buf.base);
   }
 
-  req = (uv_req_t*) malloc(sizeof *req);
-  uv_req_init(req, (uv_handle_t*)handle, (void *(*)(void *))after_shutdown);
-  uv_shutdown(req);
+  req = (uv_shutdown_t*) malloc(sizeof *req);
+  r = uv_shutdown(req, handle, after_shutdown);
+  ASSERT(r == 0);
 }
 
 
@@ -102,16 +103,18 @@ static void on_connection(uv_handle_t* server, int status) {
 }
 
 
-static void on_connect(void* req) {
+static void on_connect(uv_connect_t* req, int status) {
   struct sockaddr sockname;
   int namelen = sizeof(sockname);
-  int status;
+  int r;
 
-  status = uv_getsockname(&tcp, &sockname, &namelen);
-  if (status != 0) {
+  ASSERT(status == 0);
+
+  r = uv_getsockname(&tcp, &sockname, &namelen);
+  if (r != 0) {
     fprintf(stderr, "uv_getsockname error (connector) %d\n", uv_last_error().code);
   }
-  ASSERT(status == 0);
+  ASSERT(r == 0);
 
   getsocknamecount++;
 
@@ -162,9 +165,7 @@ static void tcp_connector() {
   tcp.data = &connect_req;
   ASSERT(!r);
 
-  uv_req_init(&connect_req, (uv_handle_t*)(&tcp), (void *(*)(void *))on_connect);
-
-  r = uv_tcp_connect(&connect_req, server_addr);
+  r = uv_tcp_connect(&connect_req, &tcp, server_addr, on_connect);
   ASSERT(!r);
 }
 
index 81cd93e..e4d4f1d 100644 (file)
@@ -43,8 +43,7 @@ typedef struct {
     uv_tcp_t tcp;
     uv_pipe_t pipe;
   };
-  uv_req_t connect_req;
-  uv_req_t read_req;
+  uv_connect_t connect_req;
   char read_buffer[BUFSIZE];
 } pinger_t;
 
@@ -70,25 +69,22 @@ static void pinger_on_close(uv_handle_t* handle) {
 }
 
 
-static void pinger_after_write(uv_req_t *req, int status) {
+static void pinger_after_write(uv_write_t *req, int status) {
   ASSERT(status == 0);
-
   free(req);
 }
 
 
 static void pinger_write_ping(pinger_t* pinger) {
-  uv_req_t *req;
+  uv_write_t *req;
   uv_buf_t buf;
 
   buf.base = (char*)&PING;
   buf.len = strlen(PING);
 
-  req = (uv_req_t*)malloc(sizeof(*req));
-  uv_req_init(req, (uv_handle_t*)(&pinger->tcp),
-      (void *(*)(void *))pinger_after_write);
+  req = malloc(sizeof(uv_write_t));
 
-  if (uv_write(req, &buf, 1)) {
+  if (uv_write(req, (uv_stream_t*)&pinger->tcp, &buf, 1, pinger_after_write)) {
     FATAL("uv_write failed");
   }
 
@@ -134,7 +130,7 @@ static void pinger_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) {
 }
 
 
-static void pinger_on_connect(uv_req_t *req, int status) {
+static void pinger_on_connect(uv_connect_t *req, int status) {
   pinger_t *pinger = (pinger_t*)req->handle->data;
 
   ASSERT(status == 0);
@@ -162,10 +158,8 @@ static void tcp_pinger_v6_new() {
 
   /* We are never doing multiple reads/connects at a time anyway. */
   /* so these handles can be pre-initialized. */
-  uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
-      (void *(*)(void *))pinger_on_connect);
-
-  r = uv_tcp_connect6(&pinger->connect_req, server_addr);
+  r = uv_tcp_connect6(&pinger->connect_req, &pinger->tcp, server_addr,
+      pinger_on_connect);
   ASSERT(!r);
 }
 
@@ -186,10 +180,7 @@ static void tcp_pinger_new() {
 
   /* We are never doing multiple reads/connects at a time anyway. */
   /* so these handles can be pre-initialized. */
-  uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
-      (void *(*)(void *))pinger_on_connect);
-
-  r = uv_tcp_connect(&pinger->connect_req, server_addr);
+  r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, pinger_on_connect);
   ASSERT(!r);
 }
 
@@ -209,10 +200,8 @@ static void pipe_pinger_new() {
 
   /* We are never doing multiple reads/connects at a time anyway. */
   /* so these handles can be pre-initialized. */
-  uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->pipe),
-      (void *(*)(void *))pinger_on_connect);
 
-  r = uv_pipe_connect(&pinger->connect_req, TEST_PIPENAME);
+  r = uv_pipe_connect(&pinger->connect_req, &pinger->pipe, TEST_PIPENAME, pinger_on_connect);
   ASSERT(!r);
 }
 
index 8a960c9..67af495 100644 (file)
@@ -26,7 +26,9 @@
 
 static uv_timer_t timer;
 static uv_tcp_t tcp;
-static uv_req_t connect_req, write_req, shutdown_req;
+static uv_connect_t connect_req;
+static uv_write_t write_req;
+static uv_shutdown_t shutdown_req;
 static uv_buf_t qbuf;
 static int got_q;
 static int got_eof;
@@ -74,7 +76,7 @@ static void read_cb(uv_stream_t* t, ssize_t nread, uv_buf_t buf) {
 }
 
 
-static void shutdown_cb(uv_req_t *req, int status) {
+static void shutdown_cb(uv_shutdown_t *req, int status) {
   ASSERT(req == &shutdown_req);
 
   ASSERT(called_connect_cb == 1);
@@ -87,7 +89,7 @@ static void shutdown_cb(uv_req_t *req, int status) {
 }
 
 
-static void connect_cb(uv_req_t *req, int status) {
+static void connect_cb(uv_connect_t *req, int status) {
   ASSERT(status == 0);
   ASSERT(req == &connect_req);
 
@@ -98,12 +100,10 @@ static void connect_cb(uv_req_t *req, int status) {
    * Write the letter 'Q' to gracefully kill the echo-server. This will not
    * effect our connection.
    */
-  uv_req_init(&write_req, (uv_handle_t*)&tcp, NULL);
-  uv_write(&write_req, &qbuf, 1);
+  uv_write(&write_req, (uv_stream_t*) &tcp, &qbuf, 1, NULL);
 
   /* Shutdown our end of the connection.  */
-  uv_req_init(&shutdown_req, (uv_handle_t*)&tcp, (void *(*)(void *))shutdown_cb);
-  uv_shutdown(&shutdown_req);
+  uv_shutdown(&shutdown_req, (uv_stream_t*) &tcp, shutdown_cb);
 
   called_connect_cb++;
   ASSERT(called_shutdown_cb == 0);
@@ -165,8 +165,7 @@ TEST_IMPL(shutdown_eof) {
   r = uv_tcp_init(&tcp);
   ASSERT(!r);
 
-  uv_req_init(&connect_req, (uv_handle_t*) &tcp, (void *(*)(void *))connect_cb);
-  r = uv_tcp_connect(&connect_req, server_addr);
+  r = uv_tcp_connect(&connect_req, &tcp, server_addr, connect_cb);
   ASSERT(!r);
 
   uv_run();
index 4e305a9..73cf45b 100644 (file)
@@ -62,7 +62,7 @@ static void close_cb(uv_handle_t* handle) {
 }
 
 
-static void shutdown_cb(uv_req_t* req, int status) {
+static void shutdown_cb(uv_shutdown_t* req, int status) {
   uv_tcp_t* tcp;
 
   ASSERT(req);
@@ -104,7 +104,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
 }
 
 
-static void write_cb(uv_req_t* req, int status) {
+static void write_cb(uv_write_t* req, int status) {
   ASSERT(req != NULL);
 
   if (status) {
@@ -120,9 +120,11 @@ static void write_cb(uv_req_t* req, int status) {
 }
 
 
-static void connect_cb(uv_req_t* req, int status) {
+static void connect_cb(uv_connect_t* req, int status) {
   uv_buf_t send_bufs[CHUNKS_PER_WRITE];
   uv_tcp_t* tcp;
+  uv_write_t* write_req;
+  uv_shutdown_t* shutdown_req;
   int i, j, r;
 
   ASSERT(req != NULL);
@@ -141,26 +143,21 @@ static void connect_cb(uv_req_t* req, int status) {
       bytes_sent += CHUNK_SIZE;
     }
 
-    req = (uv_req_t*)malloc(sizeof *req);
-    ASSERT(req != NULL);
+    write_req = malloc(sizeof(uv_write_t));
+    ASSERT(write_req != NULL);
 
-    uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))write_cb);
-    r = uv_write(req, (uv_buf_t*)&send_bufs, CHUNKS_PER_WRITE);
+    r = uv_write(write_req, (uv_stream_t*) tcp, (uv_buf_t*)&send_bufs,
+        CHUNKS_PER_WRITE, write_cb);
     ASSERT(r == 0);
   }
 
   /* Shutdown on drain. FIXME: dealloc req? */
-  req = (uv_req_t*) malloc(sizeof(uv_req_t));
-  ASSERT(req != NULL);
-  uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb);
-  r = uv_shutdown(req);
+  shutdown_req = malloc(sizeof(uv_shutdown_t));
+  ASSERT(shutdown_req != NULL);
+  r = uv_shutdown(shutdown_req, (uv_stream_t*)tcp, shutdown_cb);
   ASSERT(r == 0);
 
   /* Start reading */
-  req = (uv_req_t*)malloc(sizeof *req);
-  ASSERT(req != NULL);
-
-  uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))read_cb);
   r = uv_read_start((uv_stream_t*)tcp, alloc_cb, read_cb);
   ASSERT(r == 0);
 }
@@ -169,7 +166,7 @@ static void connect_cb(uv_req_t* req, int status) {
 TEST_IMPL(tcp_writealot) {
   struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
   uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client);
-  uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req);
+  uv_connect_t* connect_req = malloc(sizeof(uv_connect_t));
   int r;
 
   ASSERT(client != NULL);
@@ -184,8 +181,7 @@ TEST_IMPL(tcp_writealot) {
   r = uv_tcp_init(client);
   ASSERT(r == 0);
 
-  uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb);
-  r = uv_tcp_connect(connect_req, addr);
+  r = uv_tcp_connect(connect_req, client, addr, connect_cb);
   ASSERT(r == 0);
 
   uv_run();
index 1e530e8..b0e5dfe 100644 (file)
@@ -61,25 +61,37 @@ static Persistent<String> write_queue_size_sym;
 
 class TCPWrap;
 
+template <typename T>
 class ReqWrap {
  public:
-  ReqWrap(uv_handle_t* handle, void* callback) {
+  ReqWrap() {
     HandleScope scope;
     object_ = Persistent<Object>::New(Object::New());
-    uv_req_init(&req_, handle, (void* (*)(void*))callback);
-    req_.data = this;
   }
 
   ~ReqWrap() {
+    // Assert that someone has called Dispatched()
+    assert(req_.data == this);
     assert(!object_.IsEmpty());
     object_.Dispose();
     object_.Clear();
   }
 
+  // Call this after the req has been dispatched.
+  void Dispatched() {
+    req_.data = this;
+  }
+
   Persistent<Object> object_;
-  uv_req_t req_;
+  T req_;
 };
 
+
+typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
+typedef class ReqWrap<uv_write_t> WriteWrap;
+typedef class ReqWrap<uv_connect_t> ConnectWrap;
+
+
 class TCPWrap {
  public:
 
@@ -373,8 +385,8 @@ class TCPWrap {
     return scope.Close(Integer::New(r));
   }
 
-  static void AfterWrite(uv_req_t* req, int status) {
-    ReqWrap* req_wrap = (ReqWrap*) req->data;
+  static void AfterWrite(uv_write_t* req, int status) {
+    WriteWrap* req_wrap = (WriteWrap*) req->data;
     TCPWrap* wrap = (TCPWrap*) req->handle->data;
 
     HandleScope scope;
@@ -420,11 +432,7 @@ class TCPWrap {
       length = args[2]->IntegerValue();
     }
 
-    // I hate when people program C++ like it was C, and yet I do it too.
-    // I'm too lazy to come up with the perfect class hierarchy here. Let's
-    // just do some type munging.
-    ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_,
-                                    (void*)AfterWrite);
+    WriteWrap* req_wrap = new WriteWrap();
 
     req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj);
 
@@ -432,7 +440,10 @@ class TCPWrap {
     buf.base = Buffer::Data(buffer_obj) + offset;
     buf.len = length;
 
-    int r = uv_write(&req_wrap->req_, &buf, 1);
+    int r = uv_write(&req_wrap->req_, (uv_stream_t*)&wrap->handle_, &buf, 1,
+        AfterWrite);
+
+    req_wrap->Dispatched();
 
     wrap->UpdateWriteQueueSize();
 
@@ -445,8 +456,8 @@ class TCPWrap {
     }
   }
 
-  static void AfterConnect(uv_req_t* req, int status) {
-    ReqWrap* req_wrap = (ReqWrap*) req->data;
+  static void AfterConnect(uv_connect_t* req, int status) {
+    ConnectWrap* req_wrap = (ConnectWrap*) req->data;
     TCPWrap* wrap = (TCPWrap*) req->handle->data;
 
     HandleScope scope;
@@ -482,10 +493,12 @@ class TCPWrap {
     // I hate when people program C++ like it was C, and yet I do it too.
     // I'm too lazy to come up with the perfect class hierarchy here. Let's
     // just do some type munging.
-    ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_,
-                                    (void*)AfterConnect);
+    ConnectWrap* req_wrap = new ConnectWrap();
+    
+    int r = uv_tcp_connect(&req_wrap->req_, &wrap->handle_, address,
+        AfterConnect);
 
-    int r = uv_tcp_connect(&req_wrap->req_, address);
+    req_wrap->Dispatched();
 
     if (r) {
       SetErrno(uv_last_error().code);
@@ -506,13 +519,12 @@ class TCPWrap {
 
     struct sockaddr_in6 address = uv_ip6_addr(*ip_address, port);
 
-    // I hate when people program C++ like it was C, and yet I do it too.
-    // I'm too lazy to come up with the perfect class hierarchy here. Let's
-    // just do some type munging.
-    ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_,
-                                    (void*)AfterConnect);
+    ConnectWrap* req_wrap = new ConnectWrap();
 
-    int r = uv_tcp_connect6(&req_wrap->req_, address);
+    int r = uv_tcp_connect6(&req_wrap->req_, &wrap->handle_, address,
+        AfterConnect);
+
+    req_wrap->Dispatched();
 
     if (r) {
       SetErrno(uv_last_error().code);
@@ -523,8 +535,8 @@ class TCPWrap {
     }
   }
 
-  static void AfterShutdown(uv_req_t* req, int status) {
-    ReqWrap* req_wrap = (ReqWrap*) req->data;
+  static void AfterShutdown(uv_shutdown_t* req, int status) {
+    ReqWrap<uv_shutdown_t>* req_wrap = (ReqWrap<uv_shutdown_t>*) req->data;
     TCPWrap* wrap = (TCPWrap*) req->handle->data;
 
     // The request object should still be there.
@@ -552,10 +564,12 @@ class TCPWrap {
 
     UNWRAP
 
-    ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_,
-                                    (void*)AfterShutdown);
+    ShutdownWrap* req_wrap = new ShutdownWrap();
+
+    int r = uv_shutdown(&req_wrap->req_, (uv_stream_t*) &wrap->handle_,
+        AfterShutdown);
 
-    int r = uv_shutdown(&req_wrap->req_);
+    req_wrap->Dispatched();
 
     if (r) {
       SetErrno(uv_last_error().code);
@@ -569,7 +583,6 @@ class TCPWrap {
   uv_tcp_t handle_;
   Persistent<Object> object_;
   size_t slab_offset_;
-  friend class ReqWrap;
 };