Upgrade libuv to 4eff34da4
authorRyan Dahl <ry@tinyclouds.org>
Mon, 18 Jul 2011 23:26:37 +0000 (16:26 -0700)
committerRyan Dahl <ry@tinyclouds.org>
Mon, 18 Jul 2011 23:26:37 +0000 (16:26 -0700)
deps/uv/include/uv-unix.h
deps/uv/include/uv-win.h
deps/uv/src/uv-unix.c
deps/uv/src/uv-win.c
deps/uv/test/benchmark-pump.c
deps/uv/test/echo-server.c
deps/uv/test/task.h

index 5a92f12..cad2ed4 100644 (file)
@@ -67,25 +67,27 @@ typedef struct {
 
 #define UV_STREAM_PRIVATE_FIELDS \
   uv_read_cb read_cb; \
-  uv_alloc_cb alloc_cb;
-
-
-/* UV_TCP */
-#define UV_TCP_PRIVATE_FIELDS \
-  int delayed_error; \
-  uv_connection_cb connection_cb; \
-  int accepted_fd; \
+  uv_alloc_cb alloc_cb; \
   uv_connect_t *connect_req; \
   uv_shutdown_t *shutdown_req; \
   ev_io read_watcher; \
   ev_io write_watcher; \
   ngx_queue_t write_queue; \
-  ngx_queue_t write_completed_queue;
+  ngx_queue_t write_completed_queue; \
+  int delayed_error; \
+  uv_connection_cb connection_cb; \
+  int accepted_fd;
+
+
+/* UV_TCP */
+#define UV_TCP_PRIVATE_FIELDS
 
 
 /* UV_NAMED_PIPE */
 #define UV_PIPE_PRIVATE_TYPEDEF
-#define UV_PIPE_PRIVATE_FIELDS
+#define UV_PIPE_PRIVATE_FIELDS \
+  UV_TCP_PRIVATE_FIELDS \
+  const char* pipe_fname; /* strdup'ed */ \
 
 
 /* UV_PREPARE */ \
index d9f65ba..12588e9 100644 (file)
@@ -96,7 +96,6 @@ typedef struct uv_buf_t {
   struct uv_req_s accept_req;             \
 
 #define uv_pipe_server_fields             \
-    char* name;                           \
     uv_pipe_accept_t accept_reqs[4];      \
     uv_pipe_accept_t* pending_accepts;
 
@@ -104,6 +103,7 @@ typedef struct uv_buf_t {
   HANDLE handle;
 
 #define UV_PIPE_PRIVATE_FIELDS            \
+  char* name;                             \
   union {                                 \
     struct { uv_pipe_server_fields };     \
     struct { uv_pipe_connection_fields }; \
index 1dc20d8..f35cc6f 100644 (file)
@@ -22,6 +22,8 @@
 #include "uv-common.h"
 #include "uv-eio.h"
 
+#define _GNU_SOURCE /* O_CLOEXEC */
+
 #include <stddef.h> /* NULL */
 #include <stdio.h> /* printf */
 #include <stdlib.h>
 #include <errno.h>
 #include <assert.h>
 #include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
 #include <fcntl.h>
 #include <sys/socket.h>
+#include <sys/un.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <limits.h> /* PATH_MAX */
@@ -60,11 +65,30 @@ static struct uv_ares_data_s ares_data;
 
 
 void uv__req_init(uv_req_t*);
-void uv__tcp_io(EV_P_ ev_io* watcher, int revents);
 void uv__next(EV_P_ ev_idle* watcher, int revents);
-static void uv__tcp_connect(uv_tcp_t*);
-int uv_tcp_open(uv_tcp_t*, int fd);
+static int uv__stream_open(uv_stream_t*, int fd);
 static void uv__finish_close(uv_handle_t* handle);
+static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error);
+
+static uv_write_t* uv__write(uv_stream_t* stream);
+static void uv__read(uv_stream_t* stream);
+static void uv__stream_connect(uv_stream_t*);
+static void uv__stream_io(EV_P_ ev_io* watcher, int revents);
+static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);
+
+#ifndef __GNUC__
+#define __attribute__(a)
+#endif
+
+/* Unused on systems that support O_CLOEXEC, SOCK_CLOEXEC, etc. */
+static int uv__cloexec(int fd, int set) __attribute__((unused));
+static int uv__nonblock(int fd, int set) __attribute__((unused));
+
+static int uv__socket(int domain, int type, int protocol);
+static int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
+
+size_t uv__strlcpy(char* dst, const char* src, size_t size);
+
 
 /* flags */
 enum {
@@ -161,6 +185,7 @@ static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error) {
 
 int uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
   uv_tcp_t* tcp;
+  uv_pipe_t* pipe;
   uv_async_t* async;
   uv_timer_t* timer;
 
@@ -199,6 +224,22 @@ int uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
       ev_timer_stop(EV_DEFAULT_ &timer->timer_watcher);
       break;
 
+    case UV_NAMED_PIPE:
+      pipe = (uv_pipe_t*)handle;
+      if (pipe->pipe_fname) {
+        /*
+         * Unlink the file system entity before closing the file descriptor.
+         * Doing it the other way around introduces a race where our process
+         * unlinks a socket with the same name that's just been created by
+         * another thread or process.
+         */
+        unlink(pipe->pipe_fname);
+        free((void*)pipe->pipe_fname);
+      }
+      uv_read_stop((uv_stream_t*)pipe);
+      ev_io_stop(EV_DEFAULT_ &pipe->write_watcher);
+      break;
+
     default:
       assert(0);
       return -1;
@@ -258,10 +299,10 @@ int uv_tcp_init(uv_tcp_t* tcp) {
   ngx_queue_init(&tcp->write_completed_queue);
   tcp->write_queue_size = 0;
 
-  ev_init(&tcp->read_watcher, uv__tcp_io);
+  ev_init(&tcp->read_watcher, uv__stream_io);
   tcp->read_watcher.data = tcp;
 
-  ev_init(&tcp->write_watcher, uv__tcp_io);
+  ev_init(&tcp->write_watcher, uv__stream_io);
   tcp->write_watcher.data = tcp;
 
   assert(ngx_queue_empty(&tcp->write_queue));
@@ -273,40 +314,42 @@ int uv_tcp_init(uv_tcp_t* tcp) {
 
 
 int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr, int addrsize) {
-  int r;
+  int saved_errno;
+  int status;
+  int fd;
 
-  if (tcp->fd <= 0) {
-    int fd = socket(domain, SOCK_STREAM, 0);
+  saved_errno = errno;
+  status = -1;
 
-    if (fd < 0) {
+  if (tcp->fd <= 0) {
+    if ((fd = uv__socket(domain, SOCK_STREAM, 0)) == -1) {
       uv_err_new((uv_handle_t*)tcp, errno);
-      return -1;
+      goto out;
     }
 
-    if (uv_tcp_open(tcp, fd)) {
+    if (uv__stream_open((uv_stream_t*)tcp, fd)) {
+      status = -2;
       close(fd);
-      return -2;
+      goto out;
     }
   }
 
   assert(tcp->fd >= 0);
 
-  r = bind(tcp->fd, addr, addrsize);
   tcp->delayed_error = 0;
-
-  if (r) {
-    switch (errno) {
-      case EADDRINUSE:
-        tcp->delayed_error = errno;
-        return 0;
-
-      default:
-        uv_err_new((uv_handle_t*)tcp, errno);
-        return -1;
+  if (bind(tcp->fd, addr, addrsize) == -1) {
+    if (errno == EADDRINUSE) {
+      tcp->delayed_error = errno;
+    } else {
+      uv_err_new((uv_handle_t*)tcp, errno);
+      goto out;
     }
   }
+  status = 0;
 
-  return 0;
+out:
+  errno = saved_errno;
+  return status;
 }
 
 
@@ -330,32 +373,27 @@ int uv_tcp_bind6(uv_tcp_t* tcp, struct sockaddr_in6 addr) {
 }
 
 
-int uv_tcp_open(uv_tcp_t* tcp, int fd) {
-  int yes;
-  int r;
+static int uv__stream_open(uv_stream_t* stream, int fd) {
+  socklen_t yes;
 
   assert(fd >= 0);
-  tcp->fd = fd;
+  stream->fd = fd;
 
-  /* Set non-blocking. */
+  /* Reuse the port address if applicable. */
   yes = 1;
-  r = fcntl(fd, F_SETFL, O_NONBLOCK);
-  assert(r == 0);
-
-  /* Reuse the port address. */
-  r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
-  assert(r == 0);
+  if (stream->type == UV_TCP
+      && setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
+    uv_err_new((uv_handle_t*)stream, errno);
+    return -1;
+  }
 
   /* Associate the fd with each ev_io watcher. */
-  ev_io_set(&tcp->read_watcher, fd, EV_READ);
-  ev_io_set(&tcp->write_watcher, fd, EV_WRITE);
+  ev_io_set(&stream->read_watcher, fd, EV_READ);
+  ev_io_set(&stream->write_watcher, fd, EV_WRITE);
 
-  /* These should have been set up by uv_tcp_init. */
-  assert(tcp->next_watcher.data == tcp);
-  assert(tcp->write_watcher.data == tcp);
-  assert(tcp->read_watcher.data == tcp);
-  assert(tcp->read_watcher.cb == uv__tcp_io);
-  assert(tcp->write_watcher.cb == uv__tcp_io);
+  /* These should have been set up by uv_tcp_init or uv_pipe_init. */
+  assert(stream->read_watcher.cb == uv__stream_io);
+  assert(stream->write_watcher.cb == uv__stream_io);
 
   return 0;
 }
@@ -365,22 +403,22 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
   int fd;
   struct sockaddr_storage addr;
   socklen_t addrlen = sizeof(struct sockaddr_storage);
-  uv_tcp_t* tcp = watcher->data;
+  uv_stream_t* stream = watcher->data;
 
-  assert(watcher == &tcp->read_watcher ||
-         watcher == &tcp->write_watcher);
+  assert(watcher == &stream->read_watcher ||
+         watcher == &stream->write_watcher);
   assert(revents == EV_READ);
 
-  assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING));
+  assert(!uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING));
 
-  if (tcp->accepted_fd >= 0) {
-    ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
+  if (stream->accepted_fd >= 0) {
+    ev_io_stop(EV_DEFAULT_ &stream->read_watcher);
     return;
   }
 
   while (1) {
-    assert(tcp->accepted_fd < 0);
-    fd = accept(tcp->fd, (struct sockaddr*)&addr, &addrlen);
+    assert(stream->accepted_fd < 0);
+    fd = accept(stream->fd, (struct sockaddr*)&addr, &addrlen);
 
     if (fd < 0) {
       if (errno == EAGAIN) {
@@ -390,16 +428,16 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
         /* TODO special trick. unlock reserved socket, accept, close. */
         return;
       } else {
-        uv_err_new((uv_handle_t*)tcp, errno);
-        tcp->connection_cb((uv_handle_t*)tcp, -1);
+        uv_err_new((uv_handle_t*)stream, errno);
+        stream->connection_cb((uv_handle_t*)stream, -1);
       }
 
     } else {
-      tcp->accepted_fd = fd;
-      tcp->connection_cb((uv_handle_t*)tcp, 0);
-      if (tcp->accepted_fd >= 0) {
+      stream->accepted_fd = fd;
+      stream->connection_cb((uv_handle_t*)stream, 0);
+      if (stream->accepted_fd >= 0) {
         /* The user hasn't yet accepted called uv_accept() */
-        ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
+        ev_io_stop(EV_DEFAULT_ &stream->read_watcher);
         return;
       }
     }
@@ -408,24 +446,36 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
 
 
 int uv_accept(uv_handle_t* server, uv_stream_t* client) {
-  uv_tcp_t* tcpServer = (uv_tcp_t*)server;
-  uv_tcp_t* tcpClient = (uv_tcp_t*)client;
+  uv_stream_t* streamServer;
+  uv_stream_t* streamClient;
+  int saved_errno;
+  int status;
+
+  saved_errno = errno;
+  status = -1;
+
+  streamServer = (uv_stream_t*)server;
+  streamClient = (uv_stream_t*)client;
 
-  if (tcpServer->accepted_fd < 0) {
+  if (streamServer->accepted_fd < 0) {
     uv_err_new(server, EAGAIN);
-    return -1;
+    goto out;
   }
 
-  if (uv_tcp_open(tcpClient, tcpServer->accepted_fd)) {
-    /* Ignore error for now */
-    tcpServer->accepted_fd = -1;
-    close(tcpServer->accepted_fd);
-    return -1;
-  } else {
-    tcpServer->accepted_fd = -1;
-    ev_io_start(EV_DEFAULT_ &tcpServer->read_watcher);
-    return 0;
+  if (uv__stream_open(streamClient, streamServer->accepted_fd)) {
+    /* TODO handle error */
+    streamServer->accepted_fd = -1;
+    close(streamServer->accepted_fd);
+    goto out;
   }
+
+  ev_io_start(EV_DEFAULT_ &streamServer->read_watcher);
+  streamServer->accepted_fd = -1;
+  status = 0;
+
+out:
+  errno = saved_errno;
+  return status;
 }
 
 
@@ -457,33 +507,11 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
 
 
 void uv__finish_close(uv_handle_t* handle) {
-  uv_tcp_t* tcp;
-
   assert(uv_flag_is_set(handle, UV_CLOSING));
   assert(!uv_flag_is_set(handle, UV_CLOSED));
   uv_flag_set(handle, UV_CLOSED);
 
   switch (handle->type) {
-    case UV_TCP:
-      /* XXX Is it necessary to stop these watchers here? weren't they
-       * supposed to be stopped in uv_close()?
-       */
-      tcp = (uv_tcp_t*)handle;
-      ev_io_stop(EV_DEFAULT_ &tcp->write_watcher);
-      ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
-
-      assert(!ev_is_active(&tcp->read_watcher));
-      assert(!ev_is_active(&tcp->write_watcher));
-
-      close(tcp->fd);
-      tcp->fd = -1;
-
-      if (tcp->accepted_fd >= 0) {
-        close(tcp->accepted_fd);
-        tcp->accepted_fd = -1;
-      }
-      break;
-
     case UV_PREPARE:
       assert(!ev_is_active(&((uv_prepare_t*)handle)->prepare_watcher));
       break;
@@ -504,6 +532,26 @@ void uv__finish_close(uv_handle_t* handle) {
       assert(!ev_is_active(&((uv_timer_t*)handle)->timer_watcher));
       break;
 
+    case UV_NAMED_PIPE:
+    case UV_TCP:
+    {
+      uv_stream_t* stream;
+
+      stream = (uv_stream_t*)handle;
+
+      assert(!ev_is_active(&stream->read_watcher));
+      assert(!ev_is_active(&stream->write_watcher));
+
+      close(stream->fd);
+      stream->fd = -1;
+
+      if (stream->accepted_fd >= 0) {
+        close(stream->accepted_fd);
+        stream->accepted_fd = -1;
+      }
+      break;
+    }
+
     default:
       assert(0);
       break;
@@ -519,15 +567,15 @@ void uv__finish_close(uv_handle_t* handle) {
 }
 
 
-uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) {
+uv_write_t* uv_write_queue_head(uv_stream_t* stream) {
   ngx_queue_t* q;
   uv_write_t* req;
 
-  if (ngx_queue_empty(&tcp->write_queue)) {
+  if (ngx_queue_empty(&stream->write_queue)) {
     return NULL;
   }
 
-  q = ngx_queue_head(&tcp->write_queue);
+  q = ngx_queue_head(&stream->write_queue);
   if (!q) {
     return NULL;
   }
@@ -552,31 +600,31 @@ void uv__next(EV_P_ ev_idle* watcher, int revents) {
 }
 
 
-static void uv__drain(uv_tcp_t* tcp) {
+static void uv__drain(uv_stream_t* stream) {
   uv_shutdown_t* req;
 
-  assert(!uv_write_queue_head(tcp));
-  assert(tcp->write_queue_size == 0);
+  assert(!uv_write_queue_head(stream));
+  assert(stream->write_queue_size == 0);
 
-  ev_io_stop(EV_DEFAULT_ &tcp->write_watcher);
+  ev_io_stop(EV_DEFAULT_ &stream->write_watcher);
 
   /* Shutdown? */
-  if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUTTING) &&
-      !uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING) &&
-      !uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT)) {
-    assert(tcp->shutdown_req);
+  if (uv_flag_is_set((uv_handle_t*)stream, UV_SHUTTING) &&
+      !uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING) &&
+      !uv_flag_is_set((uv_handle_t*)stream, UV_SHUT)) {
+    assert(stream->shutdown_req);
 
-    req = tcp->shutdown_req;
+    req = stream->shutdown_req;
 
-    if (shutdown(tcp->fd, SHUT_WR)) {
+    if (shutdown(stream->fd, SHUT_WR)) {
       /* Error. Report it. User should call uv_close(). */
-      uv_err_new((uv_handle_t*)tcp, errno);
+      uv_err_new((uv_handle_t*)stream, errno);
       if (req->cb) {
         req->cb(req, -1);
       }
     } else {
-      uv_err_new((uv_handle_t*)tcp, 0);
-      uv_flag_set((uv_handle_t*)tcp, UV_SHUT);
+      uv_err_new((uv_handle_t*)stream, 0);
+      uv_flag_set((uv_handle_t*)stream, UV_SHUT);
       if (req->cb) {
         req->cb(req, 0);
       }
@@ -588,24 +636,24 @@ static void uv__drain(uv_tcp_t* tcp) {
 /* On success returns NULL. On error returns a pointer to the write request
  * which had the error.
  */
-static uv_write_t* uv__write(uv_tcp_t* tcp) {
+static uv_write_t* uv__write(uv_stream_t* stream) {
   uv_write_t* req;
   struct iovec* iov;
   int iovcnt;
   ssize_t n;
 
-  assert(tcp->fd >= 0);
+  assert(stream->fd >= 0);
 
   /* TODO: should probably while(1) here until EAGAIN */
 
   /* Get the request at the head of the queue. */
-  req = uv_write_queue_head(tcp);
+  req = uv_write_queue_head(stream);
   if (!req) {
-    assert(tcp->write_queue_size == 0);
+    assert(stream->write_queue_size == 0);
     return NULL;
   }
 
-  assert(req->handle == (uv_stream_t*)tcp);
+  assert(req->handle == stream);
 
   /* Cast to iovec. We had to have our own uv_buf_t instead of iovec
    * because Windows's WSABUF is not an iovec.
@@ -619,16 +667,16 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
    */
 
   if (iovcnt == 1) {
-    n = write(tcp->fd, iov[0].iov_base, iov[0].iov_len);
+    n = write(stream->fd, iov[0].iov_base, iov[0].iov_len);
   }
   else {
-    n = writev(tcp->fd, iov, iovcnt);
+    n = writev(stream->fd, iov, iovcnt);
   }
 
   if (n < 0) {
     if (errno != EAGAIN) {
       /* Error */
-      uv_err_new((uv_handle_t*)tcp, errno);
+      uv_err_new((uv_handle_t*)stream, errno);
       return req;
     }
   } else {
@@ -644,7 +692,7 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
       if (n < len) {
         buf->base += n;
         buf->len -= n;
-        tcp->write_queue_size -= n;
+        stream->write_queue_size -= n;
         n = 0;
 
         /* There is more to write. Break and ensure the watcher is pending. */
@@ -657,8 +705,8 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
         assert(n >= len);
         n -= len;
 
-        assert(tcp->write_queue_size >= len);
-        tcp->write_queue_size -= len;
+        assert(stream->write_queue_size >= len);
+        stream->write_queue_size -= len;
 
         if (req->write_index == req->bufcnt) {
           /* Then we're done! */
@@ -675,8 +723,8 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
            * callback called in the near future.
            * TODO: start trying to write the next request.
            */
-          ngx_queue_insert_tail(&tcp->write_completed_queue, &req->queue);
-          ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE);
+          ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
+          ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE);
           return NULL;
         }
       }
@@ -687,20 +735,20 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
   assert(n == 0 || n == -1);
 
   /* We're not done. */
-  ev_io_start(EV_DEFAULT_ &tcp->write_watcher);
+  ev_io_start(EV_DEFAULT_ &stream->write_watcher);
 
   return NULL;
 }
 
 
-static void uv__write_callbacks(uv_tcp_t* tcp) {
+static void uv__write_callbacks(uv_stream_t* stream) {
   int callbacks_made = 0;
   ngx_queue_t* q;
   uv_write_t* req;
 
-  while (!ngx_queue_empty(&tcp->write_completed_queue)) {
+  while (!ngx_queue_empty(&stream->write_completed_queue)) {
     /* Pop a req off write_completed_queue. */
-    q = ngx_queue_head(&tcp->write_completed_queue);
+    q = ngx_queue_head(&stream->write_completed_queue);
     assert(q);
     req = ngx_queue_data(q, struct uv_write_s, queue);
     ngx_queue_remove(q);
@@ -713,16 +761,16 @@ static void uv__write_callbacks(uv_tcp_t* tcp) {
     callbacks_made++;
   }
 
-  assert(ngx_queue_empty(&tcp->write_completed_queue));
+  assert(ngx_queue_empty(&stream->write_completed_queue));
 
   /* Write queue drained. */
-  if (!uv_write_queue_head(tcp)) {
-    uv__drain(tcp);
+  if (!uv_write_queue_head(stream)) {
+    uv__drain(stream);
   }
 }
 
 
-void uv__read(uv_tcp_t* tcp) {
+static void uv__read(uv_stream_t* stream) {
   uv_buf_t buf;
   struct iovec* iov;
   ssize_t nread;
@@ -730,43 +778,43 @@ void uv__read(uv_tcp_t* tcp) {
   /* XXX: Maybe instead of having UV_READING we just test if
    * tcp->read_cb is NULL or not?
    */
-  while (tcp->read_cb && uv_flag_is_set((uv_handle_t*)tcp, UV_READING)) {
-    assert(tcp->alloc_cb);
-    buf = tcp->alloc_cb((uv_stream_t*)tcp, 64 * 1024);
+  while (stream->read_cb && uv_flag_is_set((uv_handle_t*)stream, UV_READING)) {
+    assert(stream->alloc_cb);
+    buf = stream->alloc_cb(stream, 64 * 1024);
 
     assert(buf.len > 0);
     assert(buf.base);
 
     iov = (struct iovec*) &buf;
 
-    nread = read(tcp->fd, buf.base, buf.len);
+    nread = read(stream->fd, buf.base, buf.len);
 
     if (nread < 0) {
       /* Error */
       if (errno == EAGAIN) {
         /* Wait for the next one. */
-        if (uv_flag_is_set((uv_handle_t*)tcp, UV_READING)) {
-          ev_io_start(EV_DEFAULT_UC_ &tcp->read_watcher);
+        if (uv_flag_is_set((uv_handle_t*)stream, UV_READING)) {
+          ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher);
         }
-        uv_err_new((uv_handle_t*)tcp, EAGAIN);
-        tcp->read_cb((uv_stream_t*)tcp, 0, buf);
+        uv_err_new((uv_handle_t*)stream, EAGAIN);
+        stream->read_cb(stream, 0, buf);
         return;
       } else {
         /* Error. User should call uv_close(). */
-        uv_err_new((uv_handle_t*)tcp, errno);
-        tcp->read_cb((uv_stream_t*)tcp, -1, buf);
-        assert(!ev_is_active(&tcp->read_watcher));
+        uv_err_new((uv_handle_t*)stream, errno);
+        stream->read_cb(stream, -1, buf);
+        assert(!ev_is_active(&stream->read_watcher));
         return;
       }
     } else if (nread == 0) {
       /* EOF */
-      uv_err_new_artificial((uv_handle_t*)tcp, UV_EOF);
-      ev_io_stop(EV_DEFAULT_UC_ &tcp->read_watcher);
-      tcp->read_cb((uv_stream_t*)tcp, -1, buf);
+      uv_err_new_artificial((uv_handle_t*)stream, UV_EOF);
+      ev_io_stop(EV_DEFAULT_UC_ &stream->read_watcher);
+      stream->read_cb(stream, -1, buf);
       return;
     } else {
       /* Successful read */
-      tcp->read_cb((uv_stream_t*)tcp, nread, buf);
+      stream->read_cb(stream, nread, buf);
     }
   }
 }
@@ -774,8 +822,8 @@ void uv__read(uv_tcp_t* tcp) {
 
 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
   uv_tcp_t* tcp = (uv_tcp_t*)handle;
-  assert(handle->type == UV_TCP &&
-      "uv_shutdown (unix) only supports uv_tcp_t right now");
+  assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
+      && "uv_shutdown (unix) only supports uv_tcp_t right now");
   assert(tcp->fd >= 0);
 
   /* Initialize request */
@@ -800,31 +848,32 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
 }
 
 
-void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
-  uv_tcp_t* tcp = watcher->data;
+static void uv__stream_io(EV_P_ ev_io* watcher, int revents) {
+  uv_stream_t* stream = watcher->data;
 
-  assert(tcp->type == UV_TCP);
-  assert(watcher == &tcp->read_watcher ||
-         watcher == &tcp->write_watcher);
-  assert(tcp->fd >= 0);
-  assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING));
+  assert(stream->type == UV_TCP ||
+         stream->type == UV_NAMED_PIPE);
+  assert(watcher == &stream->read_watcher ||
+         watcher == &stream->write_watcher);
+  assert(stream->fd >= 0);
+  assert(!uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING));
 
-  if (tcp->connect_req) {
-    uv__tcp_connect(tcp);
+  if (stream->connect_req) {
+    uv__stream_connect(stream);
   } else {
     if (revents & EV_READ) {
-      uv__read(tcp);
+      uv__read((uv_stream_t*)stream);
     }
 
     if (revents & EV_WRITE) {
-      uv_write_t* req = uv__write(tcp);
+      uv_write_t* req = uv__write(stream);
       if (req) {
         /* Error. Notify the user. */
         if (req->cb) {
           req->cb(req, -1);
         }
       } else {
-        uv__write_callbacks(tcp);
+        uv__write_callbacks(stream);
       }
     }
   }
@@ -836,32 +885,32 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
  * In order to determine if we've errored out or succeeded must call
  * getsockopt.
  */
-static void uv__tcp_connect(uv_tcp_t* tcp) {
+static void uv__stream_connect(uv_stream_t* stream) {
   int error;
-  uv_connect_t* req = tcp->connect_req;
+  uv_connect_t* req = stream->connect_req;
   socklen_t errorsize = sizeof(int);
 
-  assert(tcp->type == UV_TCP);
-  assert(tcp->fd >= 0);
+  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
+  assert(stream->fd >= 0);
   assert(req);
 
-  if (tcp->delayed_error) {
+  if (stream->delayed_error) {
     /* To smooth over the differences between unixes errors that
      * were reported synchronously on the first connect can be delayed
      * until the next tick--which is now.
      */
-    error = tcp->delayed_error;
-    tcp->delayed_error = 0;
+    error = stream->delayed_error;
+    stream->delayed_error = 0;
   } else {
     /* Normal situation: we need to get the socket error from the kernel. */
-    getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
+    getsockopt(stream->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
   }
 
   if (!error) {
-    ev_io_start(EV_DEFAULT_ &tcp->read_watcher);
+    ev_io_start(EV_DEFAULT_ &stream->read_watcher);
 
     /* Successful connection */
-    tcp->connect_req = NULL;
+    stream->connect_req = NULL;
     if (req->cb) {
       req->cb(req, 0);
     }
@@ -871,9 +920,9 @@ static void uv__tcp_connect(uv_tcp_t* tcp) {
     return;
   } else {
     /* Error */
-    uv_err_new((uv_handle_t*)tcp, error);
+    uv_err_new((uv_handle_t*)stream, error);
 
-    tcp->connect_req = NULL;
+    stream->connect_req = NULL;
     if (req->cb) {
       req->cb(req, -1);
     }
@@ -881,45 +930,52 @@ static void uv__tcp_connect(uv_tcp_t* tcp) {
 }
 
 
-static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr,
-    socklen_t addrlen, uv_connect_cb cb) {
+static int uv__connect(uv_connect_t* req,
+                       uv_stream_t* stream,
+                       struct sockaddr* addr,
+                       socklen_t addrlen,
+                       uv_connect_cb cb) {
+
+  int sockfd;
   int r;
 
-  if (tcp->fd <= 0) {
-    int fd = socket(addr->sa_family, SOCK_STREAM, 0);
+  if (stream->fd <= 0) {
+    if ((sockfd = uv__socket(addr->sa_family, SOCK_STREAM, 0)) == -1) {
 
-    if (fd < 0) {
-      uv_err_new((uv_handle_t*)tcp, errno);
+    }
+
+    if (sockfd < 0) {
+      uv_err_new((uv_handle_t*)stream, errno);
       return -1;
     }
 
-    if (uv_tcp_open(tcp, fd)) {
-      close(fd);
+    if (uv__stream_open(stream, sockfd)) {
+      close(sockfd);
       return -2;
     }
   }
 
   uv__req_init((uv_req_t*)req);
   req->cb = cb;
-  req->handle = (uv_stream_t*)tcp;
+  req->handle = stream;
   req->type = UV_CONNECT;
   ngx_queue_init(&req->queue);
 
-  if (tcp->connect_req) {
-    uv_err_new((uv_handle_t*)tcp, EALREADY);
+  if (stream->connect_req) {
+    uv_err_new((uv_handle_t*)stream, EALREADY);
     return -1;
   }
 
-  if (tcp->type != UV_TCP) {
-    uv_err_new((uv_handle_t*)tcp, ENOTSOCK);
+  if (stream->type != UV_TCP) {
+    uv_err_new((uv_handle_t*)stream, ENOTSOCK);
     return -1;
   }
 
-  tcp->connect_req = req;
+  stream->connect_req = req;
 
-  r = connect(tcp->fd, addr, addrlen);
+  r = connect(stream->fd, addr, addrlen);
 
-  tcp->delayed_error = 0;
+  stream->delayed_error = 0;
 
   if (r != 0 && errno != EINPROGRESS) {
     switch (errno) {
@@ -928,41 +984,87 @@ static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr,
        * wait.
        */
       case ECONNREFUSED:
-        tcp->delayed_error = errno;
+        stream->delayed_error = errno;
         break;
 
       default:
-        uv_err_new((uv_handle_t*)tcp, errno);
+        uv_err_new((uv_handle_t*)stream, errno);
         return -1;
     }
   }
 
-  assert(tcp->write_watcher.data == tcp);
-  ev_io_start(EV_DEFAULT_ &tcp->write_watcher);
+  assert(stream->write_watcher.data == stream);
+  ev_io_start(EV_DEFAULT_ &stream->write_watcher);
 
-  if (tcp->delayed_error) {
-    ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE);
+  if (stream->delayed_error) {
+    ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE);
   }
 
   return 0;
 }
 
 
-int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
-    struct sockaddr_in address, uv_connect_cb cb) {
-  assert(handle->type == UV_TCP);
-  assert(address.sin_family == AF_INET);
-  return uv__connect(req, handle, (struct sockaddr*) &address,
-      sizeof(struct sockaddr_in), cb);
+int uv_tcp_connect(uv_connect_t* req,
+                   uv_tcp_t* handle,
+                   struct sockaddr_in address,
+                   uv_connect_cb cb) {
+  int saved_errno;
+  int status;
+
+  saved_errno = errno;
+  status = -1;
+
+  if (handle->type != UV_TCP) {
+    uv_err_new((uv_handle_t*)handle, EINVAL);
+    goto out;
+  }
+
+  if (address.sin_family != AF_INET) {
+    uv_err_new((uv_handle_t*)handle, EINVAL);
+    goto out;
+  }
+
+  status = uv__connect(req,
+                       (uv_stream_t*)handle,
+                       (struct sockaddr*)&address,
+                       sizeof address,
+                       cb);
+
+out:
+  errno = saved_errno;
+  return status;
 }
 
 
-int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
-    struct sockaddr_in6 address, uv_connect_cb cb) {
-  assert(handle->type == UV_TCP);
-  assert(address.sin6_family == AF_INET6);
-  return uv__connect(req, handle, (struct sockaddr*) &address,
-      sizeof(struct sockaddr_in6), cb);
+int uv_tcp_connect6(uv_connect_t* req,
+                    uv_tcp_t* handle,
+                    struct sockaddr_in6 address,
+                    uv_connect_cb cb) {
+  int saved_errno;
+  int status;
+
+  saved_errno = errno;
+  status = -1;
+
+  if (handle->type != UV_TCP) {
+    uv_err_new((uv_handle_t*)handle, EINVAL);
+    goto out;
+  }
+
+  if (address.sin6_family != AF_INET6) {
+    uv_err_new((uv_handle_t*)handle, EINVAL);
+    goto out;
+  }
+
+  status = uv__connect(req,
+                       (uv_stream_t*)handle,
+                       (struct sockaddr*)&address,
+                       sizeof address,
+                       cb);
+
+out:
+  errno = saved_errno;
+  return status;
 }
 
 
@@ -1004,8 +1106,10 @@ static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
  */
 int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
     uv_write_cb cb) {
+  uv_stream_t* stream;
   int empty_queue;
-  uv_tcp_t* tcp = (uv_tcp_t*)handle;
+
+  stream = (uv_stream_t*)handle;
 
   /* Initialize the req */
   uv__req_init((uv_req_t*) req);
@@ -1013,11 +1117,11 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
   req->handle = handle;
   ngx_queue_init(&req->queue);
 
-  assert(handle->type == UV_TCP &&
-      "uv_write (unix) does not yet support other types of streams");
+  assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
+      && "uv_write (unix) does not yet support other types of streams");
 
-  empty_queue = (tcp->write_queue_size == 0);
-  assert(tcp->fd >= 0);
+  empty_queue = (stream->write_queue_size == 0);
+  assert(stream->fd >= 0);
 
   ngx_queue_init(&req->queue);
   req->type = UV_WRITE;
@@ -1038,22 +1142,22 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
    */
 
   req->write_index = 0;
-  tcp->write_queue_size += uv__buf_count(bufs, bufcnt);
+  stream->write_queue_size += uv__buf_count(bufs, bufcnt);
 
   /* Append the request to write_queue. */
-  ngx_queue_insert_tail(&tcp->write_queue, &req->queue);
+  ngx_queue_insert_tail(&stream->write_queue, &req->queue);
 
-  assert(!ngx_queue_empty(&tcp->write_queue));
-  assert(tcp->write_watcher.cb == uv__tcp_io);
-  assert(tcp->write_watcher.data == tcp);
-  assert(tcp->write_watcher.fd == tcp->fd);
+  assert(!ngx_queue_empty(&stream->write_queue));
+  assert(stream->write_watcher.cb == uv__stream_io);
+  assert(stream->write_watcher.data == stream);
+  assert(stream->write_watcher.fd == stream->fd);
 
   /* If the queue was empty when this function began, we should attempt to
    * do the write immediately. Otherwise start the write_watcher and wait
    * for the fd to become writable.
    */
   if (empty_queue) {
-    if (uv__write(tcp)) {
+    if (uv__write(stream)) {
       /* Error. uv_last_error has been set. */
       return -1;
     }
@@ -1063,13 +1167,13 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
    * means we need to make the callback. The callback can only be done on a
    * fresh stack so we feed the event loop in order to service it.
    */
-  if (ngx_queue_empty(&tcp->write_queue)) {
-    ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE);
+  if (ngx_queue_empty(&stream->write_queue)) {
+    ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE);
   } else {
     /* Otherwise there is data to write - so we should wait for the file
      * descriptor to become writable.
      */
-    ev_io_start(EV_DEFAULT_ &tcp->write_watcher);
+    ev_io_start(EV_DEFAULT_ &stream->write_watcher);
   }
 
   return 0;
@@ -1097,28 +1201,27 @@ int64_t uv_now() {
 
 
 int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
-  uv_tcp_t* tcp = (uv_tcp_t*)stream;
+  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
 
   /* The UV_READING flag is irrelevant of the state of the tcp - it just
    * expresses the desired state of the user.
    */
-  uv_flag_set((uv_handle_t*)tcp, UV_READING);
+  uv_flag_set((uv_handle_t*)stream, UV_READING);
 
   /* TODO: try to do the read inline? */
   /* TODO: keep track of tcp state. If we've gotten a EOF then we should
    * not start the IO watcher.
    */
-  assert(tcp->fd >= 0);
+  assert(stream->fd >= 0);
   assert(alloc_cb);
 
-  tcp->read_cb = read_cb;
-  tcp->alloc_cb = alloc_cb;
+  stream->read_cb = read_cb;
+  stream->alloc_cb = alloc_cb;
 
   /* These should have been set by uv_tcp_init. */
-  assert(tcp->read_watcher.data == tcp);
-  assert(tcp->read_watcher.cb == uv__tcp_io);
+  assert(stream->read_watcher.cb == uv__stream_io);
 
-  ev_io_start(EV_DEFAULT_UC_ &tcp->read_watcher);
+  ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher);
   return 0;
 }
 
@@ -1635,21 +1738,306 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle,
 
 
 int uv_pipe_init(uv_pipe_t* handle) {
-  assert(0 && "implement me");
+  memset(handle, 0, sizeof handle);
+
+  uv__handle_init((uv_handle_t*)handle, UV_NAMED_PIPE);
+  uv_counters()->pipe_init++;
+
+  handle->type = UV_NAMED_PIPE;
+  handle->pipe_fname = NULL; /* Only set by listener. */
+
+  ev_init(&handle->write_watcher, uv__stream_io);
+  ev_init(&handle->read_watcher, uv__stream_io);
+  handle->write_watcher.data = handle;
+  handle->read_watcher.data = handle;
+  handle->fd = -1;
+
+  ngx_queue_init(&handle->write_completed_queue);
+  ngx_queue_init(&handle->write_queue);
+
+  return 0;
 }
 
 
 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
-  assert(0 && "implement me");
+  struct sockaddr_un sun;
+  int saved_errno;
+  int sockfd;
+  int status;
+  int bound;
+
+  saved_errno = errno;
+  sockfd = -1;
+  status = -1;
+  bound = 0;
+
+  /* Make a copy of the file name, it outlives this function's scope. */
+  if ((name = (const char*)strdup(name)) == NULL) {
+    uv_err_new((uv_handle_t*)handle, ENOMEM);
+    goto out;
+  }
+
+  if ((sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+    uv_err_new((uv_handle_t*)handle, errno);
+    goto out;
+  }
+
+  memset(&sun, 0, sizeof sun);
+  uv__strlcpy(sun.sun_path, name, sizeof(sun.sun_path));
+  sun.sun_family = AF_UNIX;
+
+  if (bind(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) {
+#ifdef DONT_RACE_ME_BRO
+    /*
+     * Try to bind the socket. Note that we explicitly don't act
+     * on EADDRINUSE. Unlinking and trying to bind again opens
+     * a window for races with other threads and processes.
+     */
+    uv_err_new((uv_handle_t*)handle, errno);
+    goto out;
+#else
+    /*
+     * Try to re-purpose the socket. This is a potential race window.
+     */
+    if (errno != EADDRINUSE
+        || unlink(name) == -1
+        || bind(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) {
+      uv_err_new((uv_handle_t*)handle, errno);
+      goto out;
+    }
+#endif
+  }
+  bound = 1;
+
+  /* Success. */
+  handle->pipe_fname = name; /* Is a strdup'ed copy. */
+  handle->fd = sockfd;
+  status = 0;
+
+out:
+  /* Clean up on error. */
+  if (status) {
+    if (bound) {
+      /* unlink() before close() to avoid races. */
+      unlink(name);
+    }
+    close(sockfd);
+    free((void*)name);
+  }
+
+  errno = saved_errno;
+  return status;
 }
 
 
 int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
-  assert(0 && "implement me");
+  int saved_errno;
+  int status;
+
+  saved_errno = errno;
+
+  if ((status = listen(handle->fd, SOMAXCONN)) == -1) {
+    uv_err_new((uv_handle_t*)handle, errno);
+  } else {
+    handle->connection_cb = cb;
+    ev_io_init(&handle->read_watcher, uv__pipe_accept, handle->fd, EV_READ);
+    ev_io_start(EV_DEFAULT_ &handle->read_watcher);
+  }
+
+  errno = saved_errno;
+  return status;
+}
+
+
+int uv_pipe_connect(uv_connect_t* req,
+                    uv_pipe_t* handle,
+                    const char* name,
+                    uv_connect_cb cb) {
+  struct sockaddr_un sun;
+  int saved_errno;
+  int sockfd;
+  int status;
+
+  saved_errno = errno;
+  sockfd = -1;
+  status = -1;
+
+  if ((sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+    uv_err_new((uv_handle_t*)handle, errno);
+    goto out;
+  }
+
+  memset(&sun, 0, sizeof sun);
+  uv__strlcpy(sun.sun_path, name, sizeof(sun.sun_path));
+  sun.sun_family = AF_UNIX;
+
+  /* We don't check for EINPROGRESS. Think about it: the socket
+   * is either there or not.
+   */
+  if (connect(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) {
+    uv_err_new((uv_handle_t*)handle, errno);
+    close(sockfd);
+    goto out;
+  }
+
+  handle->fd = sockfd;
+  ev_io_init(&handle->read_watcher, uv__stream_io, sockfd, EV_READ);
+  ev_io_init(&handle->write_watcher, uv__stream_io, sockfd, EV_WRITE);
+  ev_io_start(EV_DEFAULT_ &handle->read_watcher);
+  ev_io_start(EV_DEFAULT_ &handle->write_watcher);
+
+  status = 0;
+
+out:
+  uv__req_init((uv_req_t*)req);
+  req->handle = (uv_stream_t*)handle;
+  req->type = UV_CONNECT;
+  req->cb = cb;
+  ngx_queue_init(&req->queue);
+
+  if (cb) {
+    cb(req, status);
+  }
+
+  /* Mimic the Windows pipe implementation, always
+   * return 0 and let the callback handle errors.
+   */
+  errno = saved_errno;
+  return 0;
 }
 
 
-int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
-    const char* name, uv_connect_cb cb) {
-  assert(0 && "implement me");
+/* TODO merge with uv__server_io()? */
+static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) {
+  struct sockaddr_un sun;
+  uv_pipe_t* pipe;
+  int saved_errno;
+  int sockfd;
+
+  saved_errno = errno;
+  pipe = watcher->data;
+
+  assert(pipe->type == UV_NAMED_PIPE);
+  assert(pipe->pipe_fname != NULL);
+
+  sockfd = uv__accept(pipe->fd, (struct sockaddr *)&sun, sizeof sun);
+  if (sockfd == -1) {
+    if (errno == EAGAIN || errno == EWOULDBLOCK) {
+      assert(0 && "EAGAIN on uv__accept(pipefd)");
+    } else {
+      uv_err_new((uv_handle_t*)pipe, errno);
+    }
+  } else {
+    pipe->accepted_fd = sockfd;
+    pipe->connection_cb((uv_handle_t*)pipe, 0);
+    if (pipe->accepted_fd == sockfd) {
+      /* The user hasn't yet accepted called uv_accept() */
+      ev_io_stop(EV_DEFAULT_ &pipe->read_watcher);
+    }
+  }
+
+  errno = saved_errno;
+}
+
+
+/* Open a socket in non-blocking close-on-exec mode, atomically if possible. */
+static int uv__socket(int domain, int type, int protocol) {
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+  return socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
+#else
+  int sockfd;
+
+  if ((sockfd = socket(domain, type, protocol)) == -1) {
+    return -1;
+  }
+
+  if (uv__nonblock(sockfd, 1) == -1 || uv__cloexec(sockfd, 1) == -1) {
+    close(sockfd);
+    return -1;
+  }
+
+  return sockfd;
+#endif
+}
+
+
+static int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t slen) {
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+  return accept4(sockfd, saddr, &slen, SOCK_NONBLOCK | SOCK_CLOEXEC);
+#else
+  int peerfd;
+
+  if ((peerfd = accept(sockfd, saddr, &slen)) == -1) {
+    return -1;
+  }
+
+  if (uv__cloexec(peerfd, 1) == -1 || uv__nonblock(peerfd, 1) == -1) {
+    close(peerfd);
+    return -1;
+  }
+
+  return peerfd;
+#endif
+}
+
+
+static int uv__nonblock(int fd, int set) {
+  int flags;
+
+  if ((flags = fcntl(fd, F_GETFL)) == -1) {
+    return -1;
+  }
+
+  if (set) {
+    flags |= O_NONBLOCK;
+  } else {
+    flags &= ~O_NONBLOCK;
+  }
+
+  if (fcntl(fd, F_SETFL, flags) == -1) {
+    return -1;
+  }
+
+  return 0;
+}
+
+
+static int uv__cloexec(int fd, int set) {
+  int flags;
+
+  if ((flags = fcntl(fd, F_GETFD)) == -1) {
+    return -1;
+  }
+
+  if (set) {
+    flags |= FD_CLOEXEC;
+  } else {
+    flags &= ~FD_CLOEXEC;
+  }
+
+  if (fcntl(fd, F_SETFD, flags) == -1) {
+    return -1;
+  }
+
+  return 0;
+}
+
+
+/* TODO move to uv-common.c? */
+size_t uv__strlcpy(char* dst, const char* src, size_t size) {
+  const char *org;
+
+  if (size == 0) {
+    return 0;
+  }
+
+  org = src;
+  while (size > 1) {
+    if ((*dst++ = *src++) == '\0') {
+      return org - src;
+    }
+  }
+  *dst = '\0';
+
+  return src - org;
 }
index 111d99c..8241340 100644 (file)
@@ -222,6 +222,7 @@ static char uv_zero_[] = "";
 /* mark if IPv6 sockets are supported */
 static BOOL uv_allow_ipv6 = FALSE;
 
+
 /*
  * Subclass of uv_handle_t. Used for integration of c-ares.
  */
@@ -374,6 +375,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) {
     case ERROR_NO_UNICODE_TRANSLATION:      return UV_ECHARSET;
     case ERROR_BROKEN_PIPE:                 return UV_EOF;
     case ERROR_PIPE_BUSY:                   return UV_EBUSY;
+    case ERROR_SEM_TIMEOUT:                 return UV_ETIMEDOUT;
     default:                                return UV_UNKNOWN;
   }
 }
@@ -517,6 +519,7 @@ void uv_init() {
 static void uv_req_init(uv_req_t* req) {
   uv_counters()->req_init++;
   req->type = UV_UNKNOWN_REQ;
+  req->error = uv_ok_;
 }
 
 
@@ -1028,10 +1031,14 @@ static void uv_pipe_queue_accept(uv_pipe_t* handle, uv_pipe_accept_t* req) {
   /* Prepare the overlapped structure. */
   memset(&(req->overlapped), 0, sizeof(req->overlapped));
 
-  if (!ConnectNamedPipe(pipeHandle, &req->overlapped) &&
-      GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
-    /* Make this req pending reporting an error. */
-    req->error = uv_new_sys_error(GetLastError());
+  if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && GetLastError() != ERROR_IO_PENDING) {
+    if (GetLastError() == ERROR_PIPE_CONNECTED) {
+      req->pipeHandle = pipeHandle;
+      req->error = uv_ok_;
+    } else {
+      /* Make this req pending reporting an error. */
+      req->error = uv_new_sys_error(GetLastError());
+    }
     uv_insert_pending_req((uv_req_t*) req);
     handle->reqs_pending++;
     return;
@@ -2314,9 +2321,7 @@ static void uv_poll() {
     /* Package was dequeued */
     req = uv_overlapped_to_req(overlapped);
 
-    if (success) {
-      req->error = uv_ok_;
-    } else {
+    if (!success) {
       req->error = uv_new_sys_error(GetLastError());
     }
 
@@ -2970,6 +2975,7 @@ int uv_pipe_init(uv_pipe_t* handle) {
   handle->type = UV_NAMED_PIPE;
   handle->reqs_pending = 0;
   handle->pending_accepts = NULL;
+  handle->name = NULL;
 
   uv_counters()->pipe_init++;
 
@@ -3033,61 +3039,131 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
   return 0;
 }
 
+
+static int uv_set_pipe_handle(uv_pipe_t* handle, HANDLE pipeHandle) {
+  DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
+
+  if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
+    return -1;
+  }
+
+  if (CreateIoCompletionPort(pipeHandle,
+                             uv_iocp_,
+                             (ULONG_PTR)handle,
+                             0) == NULL) {
+    return -1;
+  }
+  
+  return 0;
+}
+
+
+static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
+  HANDLE pipeHandle = INVALID_HANDLE_VALUE;
+  int errno;
+  uv_pipe_t* handle;
+  uv_connect_t* req;
+
+  req = (uv_connect_t*)parameter;
+  assert(req);
+  handle = (uv_pipe_t*)req->handle;
+  assert(handle);
+  
+  /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY.  We wait for the pipe to become available with WaitNamedPipe. */
+  while (WaitNamedPipe(handle->name, 30000)) {
+    /* The pipe is now available, try to connect. */
+    pipeHandle = CreateFile(handle->name,
+                            GENERIC_READ | GENERIC_WRITE,
+                            0,
+                            NULL,
+                            OPEN_EXISTING,
+                            FILE_FLAG_OVERLAPPED,
+                            NULL);
+
+    if (pipeHandle != INVALID_HANDLE_VALUE) {
+      break;
+    }
+  } 
+
+  if (pipeHandle != INVALID_HANDLE_VALUE && !uv_set_pipe_handle(handle, pipeHandle)) {
+    handle->handle = pipeHandle;
+    req->error = uv_ok_;
+  } else {
+    req->error = uv_new_sys_error(GetLastError());
+  }
+
+  memset(&req->overlapped, 0, sizeof(req->overlapped));
+  /* Post completed */
+  if (!PostQueuedCompletionStatus(uv_iocp_,
+                                0,
+                                0,
+                                &req->overlapped)) {
+    uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
+  }   
+
+  return 0;
+}
+
+
 /* TODO: make this work with UTF8 name */
-/* TODO: run this in the thread pool */
 int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
     const char* name, uv_connect_cb cb) {
   int errno;
-  DWORD mode;
+  HANDLE pipeHandle;
+
+  handle->handle = INVALID_HANDLE_VALUE;
 
   uv_req_init((uv_req_t*) req);
   req->type = UV_CONNECT;
   req->handle = (uv_stream_t*) handle;
   req->cb = cb;
 
-  memset(&req->overlapped, 0, sizeof(req->overlapped));
-
-  handle->handle = CreateFile(name,
-                              GENERIC_READ | GENERIC_WRITE,
-                              0,
-                              NULL,
-                              OPEN_EXISTING,
-                              FILE_FLAG_OVERLAPPED,
-                              NULL);
+  pipeHandle = CreateFile(name,
+                          GENERIC_READ | GENERIC_WRITE,
+                          0,
+                          NULL,
+                          OPEN_EXISTING,
+                          FILE_FLAG_OVERLAPPED,
+                          NULL);
 
-  if (handle->handle == INVALID_HANDLE_VALUE &&
-      GetLastError() != ERROR_IO_PENDING) {
-    errno = GetLastError();
-    goto error;
-  }
+  if (pipeHandle == INVALID_HANDLE_VALUE) {
+    if (GetLastError() == ERROR_PIPE_BUSY) {
+      /* Wait for the server to make a pipe instance available. */
+      handle->name = strdup(name);
+      if (!handle->name) {
+        uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+      }
 
-  mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
+      if (!QueueUserWorkItem(&pipe_connect_thread_proc, req, WT_EXECUTELONGFUNCTION)) {
+        errno = GetLastError();
+        goto error;
+      }
 
-  if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) {
+      return 0;
+    }
+  
     errno = GetLastError();
     goto error;
   }
-
-  if (CreateIoCompletionPort(handle->handle,
-                             uv_iocp_,
-                             (ULONG_PTR)handle,
-                             0) == NULL) {
+  
+  if (uv_set_pipe_handle((uv_pipe_t*)req->handle, pipeHandle)) {
     errno = GetLastError();
     goto error;
   }
 
+  handle->handle = pipeHandle;
+
   req->error = uv_ok_;
   uv_insert_pending_req((uv_req_t*) req);
   handle->reqs_pending++;
   return 0;
 
 error:
-  if (handle->handle != INVALID_HANDLE_VALUE) {
-    CloseHandle(handle->handle);
+  if (pipeHandle != INVALID_HANDLE_VALUE) {
+    CloseHandle(pipeHandle);
   }
-  req->error = uv_new_sys_error(errno);
-  uv_insert_pending_req((uv_req_t*) req);
-  handle->reqs_pending++;
+  uv_set_sys_error(errno);
   return -1;
 }
 
@@ -3097,6 +3173,11 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
   int i;
   HANDLE pipeHandle;
 
+  if (handle->name) {
+    free(handle->name);
+    handle->name;
+  }
+
   if (handle->flags & UV_HANDLE_PIPESERVER) {
     for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
       pipeHandle = handle->accept_reqs[i].pipeHandle;
@@ -3105,7 +3186,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
       }
     }
 
-  } else {
+  } else if (handle->handle != INVALID_HANDLE_VALUE) {
     CloseHandle(handle->handle);
   }
 
index d7524f7..1732e84 100644 (file)
@@ -261,13 +261,6 @@ static void maybe_connect_some() {
       req = (uv_connect_t*) req_alloc();
       r = uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
       ASSERT(r == 0);
-
-#ifdef _WIN32
-      /* HACK: This is temporary to give the pipes server enough time to create new handles.
-       * This will go away once uv_pipe_connect can deal with UV_EBUSY.
-       */
-      Sleep(1);
-#endif
     }
   }
 }
index e107dc5..992c88b 100644 (file)
@@ -124,7 +124,7 @@ static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) {
 
 
 static void on_connection(uv_handle_t* server, int status) {
-  uv_handle_t* handle;
+  uv_stream_t* stream;
   int r;
 
   if (status != 0) {
@@ -132,25 +132,31 @@ static void on_connection(uv_handle_t* server, int status) {
   }
   ASSERT(status == 0);
 
-  if (serverType == TCP) {
-    handle = (uv_handle_t*) malloc(sizeof(uv_tcp_t));
-    ASSERT(handle != NULL);
-
-    uv_tcp_init((uv_tcp_t*)handle);
-  } else {
-    handle = (uv_handle_t*) malloc(sizeof(uv_pipe_t));
-    ASSERT(handle != NULL);
-
-    uv_pipe_init((uv_pipe_t*)handle);
+  switch (serverType) {
+  case TCP:
+    stream = malloc(sizeof(uv_tcp_t));
+    ASSERT(stream != NULL);
+    uv_tcp_init((uv_tcp_t*)stream);
+    break;
+
+  case PIPE:
+    stream = malloc(sizeof(uv_pipe_t));
+    ASSERT(stream != NULL);
+    uv_pipe_init((uv_pipe_t*)stream);
+    break;
+
+  default:
+    ASSERT(0 && "Bad serverType");
+    abort();
   }
 
   /* associate server with stream */
-  handle->data = server;
+  stream->data = server;
 
-  r = uv_accept(server, (uv_stream_t*)handle);
+  r = uv_accept(server, stream);
   ASSERT(r == 0);
 
-  r = uv_read_start((uv_stream_t*)handle, echo_alloc, after_read);
+  r = uv_read_start(stream, echo_alloc, after_read);
   ASSERT(r == 0);
 }
 
@@ -233,22 +239,19 @@ static int pipe_echo_start(char* pipeName) {
 
   r = uv_pipe_init(&pipeServer);
   if (r) {
-    /* TODO: Error codes */
-    fprintf(stderr, "Pipe creation error\n");
+    fprintf(stderr, "uv_pipe_init: %s\n", uv_strerror(uv_last_error()));
     return 1;
   }
 
   r = uv_pipe_bind(&pipeServer, pipeName);
   if (r) {
-    /* TODO: Error codes */
-    fprintf(stderr, "create error\n");
+    fprintf(stderr, "uv_pipe_bind: %s\n", uv_strerror(uv_last_error()));
     return 1;
   }
 
   r = uv_pipe_listen(&pipeServer, on_connection);
   if (r) {
-    /* TODO: Error codes */
-    fprintf(stderr, "Listen error on IPv6\n");
+    fprintf(stderr, "uv_pipe_listen: %s\n", uv_strerror(uv_last_error()));
     return 1;
   }
 
index d47c209..2c0febd 100644 (file)
@@ -33,8 +33,7 @@
 #ifdef _WIN32
 # define TEST_PIPENAME "\\\\.\\pipe\\uv-test"
 #else
-# /* TODO: define unix pipe name */
-# define TEST_PIPENAME ""
+# define TEST_PIPENAME "/tmp/uv-test-sock"
 #endif
 
 typedef enum {