Send and receive file descriptors through net.Stream.
authorPeter Griess <pg@std.in>
Thu, 3 Jun 2010 01:16:53 +0000 (18:16 -0700)
committerRyan Dahl <ry@tinyclouds.org>
Thu, 3 Jun 2010 01:16:53 +0000 (18:16 -0700)
a) create a layer of indirection in net.Stream to allow swapping in
different read/write implementations and

b) emit an 'fd' event when file descriptors are received over a UNIX pipe,
as finally as a tangential benefit

c) remove a bunch of conditionals from the primary codepaths for
ease-of-reading.

lib/net.js
src/node_net.cc
test/fixtures/recvfd.js [new file with mode: 0644]
test/simple/test-sendfd.js [new file with mode: 0644]

index ce0961a92ebf4347231063bd612532f96140e768..c8eb6701c4157232503b25a9c432bad2048a27c5 100644 (file)
@@ -42,6 +42,8 @@ var setKeepAlive= binding.setKeepAlive;
 var socketError = binding.socketError;
 var getsockname = binding.getsockname;
 var errnoException = binding.errnoException;
+var sendMsg     = binding.sendMsg;
+var recvMsg     = binding.recvMsg;
 var EINPROGRESS = binding.EINPROGRESS;
 var ENOENT      = binding.ENOENT;
 var EMFILE      = binding.EMFILE;
@@ -271,6 +273,182 @@ function _doFlush () {
   }
 }
 
+function setImplmentationMethods (self) {
+  function noData(buf, off, len) {
+    return !buf ||
+           (off != undefined && off >= buf.length) ||
+           (len == 0);
+  };
+
+  if (self.type == 'unix') {
+    self._writeImpl = function(buf, off, len, fd, flags) {
+      // Detect and disallow zero-byte writes wth an attached file
+      // descriptor. This is an implementation limitation of sendmsg(2).
+      if (fd && noData(buf, off, len)) {
+        throw new Error('File descriptors can only be written with data');
+      }
+
+      return sendMsg(self.fd, buf, off, len, fd, flags);
+    };
+
+    self._readImpl = function(buf, off, len, calledByIOWatcher) {
+      var bytesRead = recvMsg(self.fd, buf, off, len);
+
+      // Do not emit this in the same stack, otherwise we risk corrupting
+      // our buffer pool which is full of read data, but has not had
+      // had its pointers updated just yet.
+      if (recvMsg.fd !== null) {
+        process.nextTick(function() {
+          self.emit('fd', recvMsg.fd);
+        });
+      }
+
+      return bytesRead;
+    };
+  } else {
+    self._writeImpl = function(buf, off, len, fd, flags) {
+      // XXX: TLS support requires that 0-byte writes get processed
+      //      by the kernel for some reason. Otherwise, we'd just
+      //      fast-path return here.
+
+      return write(self.fd, buf, off, len, fd, flags);
+    };
+
+    self._readImpl = function(buf, off, len, calledByIOWatcher) {
+      return read(self.fd, buf, off, len);
+    };
+  }
+
+  self._shutdownImpl = function() {
+    shutdown(self.fd, 'write')
+  };
+
+  if (self.secure) {
+    var oldWrite = self._writeImpl;
+    self._writeImpl = function(buf, off, len, fd, flags) {
+      assert(buf);
+      assert(self.secure);
+
+      var bytesWritten = self.secureStream.writeInject(buf, off, len);
+
+      if (!securePool) {
+        allocNewSecurePool();
+      }
+
+      var secureLen = self.secureStream.writeExtract(
+        securePool, 0, securePool.length
+      );
+
+      if (secureLen == -1) {
+        // Check our read again for secure handshake
+        self._readWatcher.callback();
+      } else {
+        oldWrite(securePool, 0, secureLen, fd, flags);
+      }
+
+      if (!self.secureEstablished && self.secureStream.isInitFinished()) {
+        self.secureEstablished = true;
+
+        if (self._events && self._events['secure']) {
+          self.emit('secure');
+        }
+      }
+
+      return bytesWritten;
+    };
+    
+    var oldRead = self._readImpl;
+    self._readImpl = function(buf, off, len, calledByIOWatcher) {
+      assert(self.secure);
+
+      var bytesRead = 0;
+      var secureBytesRead = null;
+
+      if (!securePool) {
+        allocNewSecurePool();
+      }
+
+      if (calledByIOWatcher) {
+        secureBytesRead = oldRead(securePool, 0, securePool.length);
+        self.secureStream.readInject(securePool, 0, secureBytesRead);
+      }
+
+      var chunkBytes;
+      do {
+        chunkBytes = self.secureStream.readExtract(
+          pool,
+          pool.used + bytesRead,
+          pool.length - pool.used - bytesRead
+        );
+
+        bytesRead += chunkBytes;
+      } while ((chunkBytes > 0) && (pool.used + bytesRead < pool.length));
+
+      if (bytesRead == 0 && !calledByIOWatcher) {
+        return -1;
+      }
+
+      if (self.secureStream.readPending()) {
+        process.nextTick(function () {
+          if(self._readWatcher)
+            self._readWatcher.callback();
+        });
+      }
+
+      if (!self.secureEstablished) {
+        if (self.secureStream.isInitFinished()) {
+          self.secureEstablished = true;
+          if (self._events && self._events['secure']) {
+            self.emit('secure');
+          }
+        }
+      }
+
+      if (calledByIOWatcher && secureBytesRead === null && !self.server) {
+        // Client needs to write as part of handshake
+        self._writeWatcher.start();
+        return -1;
+      }
+
+      if (bytesRead == 0 && secureBytesRead > 0) {
+        // Deal with SSL handshake
+        if (self.server) {
+          self._checkForSecureHandshake();
+        } else {
+          if (self.secureEstablised) {
+            self.flush();
+          } else {
+            self._checkForSecureHandshake();
+          }
+        }
+
+        return -1;
+      }
+
+      return bytesRead;
+    };
+
+    var oldShutdown = self._shutdownImpl;
+    self._shutdownImpl = function() {
+      self.secureStream.shutdown();
+
+      if (!securePool) {
+        allocNewSecurePool();
+      }
+
+      var secureLen = self.secureStream.writeExtract(
+        securePool, 0, securePool.length
+      );
+
+      try {
+        oldWrite(securePool, 0, secureLen);
+      } catch (e) { }
+
+      oldShutdown();
+    };
+  }
+};
+
 function initStream (self) {
   self._readWatcher = ioWatchers.alloc();
   self._readWatcher.callback = function () {
@@ -285,68 +463,19 @@ function initStream (self) {
 
     //debug('pool.used ' + pool.used);
     var bytesRead;
-    var secureBytesRead;
 
     try {
-      if (self.secure) {
-        if (!securePool) allocNewSecurePool();
-        var calledByNextTick = (arguments.length == 0); // IOWatcher always passes arguments
-        if (!calledByNextTick) {
-          secureBytesRead = read(self.fd, securePool, 0, securePool.length);
-          self.secureStream.readInject(securePool, 0, secureBytesRead);
-        }
-        var chunkBytes;
-        bytesRead = 0;
-        do {
-          chunkBytes = self.secureStream.readExtract(pool,
-                                                     pool.used + bytesRead,
-                                                     pool.length - pool.used - bytesRead);
-          bytesRead += chunkBytes;
-        } while ((chunkBytes > 0) && (pool.used + bytesRead < pool.length));
-        if (bytesRead == 0 && calledByNextTick)
-          return;
-        if (self.secureStream.readPending()) {
-          process.nextTick(function () {
-            if(self._readWatcher)
-              self._readWatcher.callback();
-          });
-        }
-        if (!self.secureEstablished) {
-          if (self.secureStream.isInitFinished()) {
-            self.secureEstablished = true;
-            if (self._events && self._events['secure']) self.emit('secure');
-          }
-        }
-        if (secureBytesRead === null && !self.server) {
-          // Client needs to write as part of handshake
-          self._writeWatcher.start();
-          return;
-        }
-      } else {
-        bytesRead = read(self.fd,
-                       pool,
-                       pool.used,
-                       pool.length - pool.used);
-      }
+      bytesRead = self._readImpl(pool, pool.used, pool.length - pool.used, (arguments.length > 0));
     } catch (e) {
       self.destroy(e);
       return;
     }
 
-    //debug('bytesRead ' + bytesRead + '\n');
+    // Note that some _readImpl() implementations return -1 bytes
+    // read as an indication not to do any processing on the result
+    // (but not an error).
 
-    if (self.secure && bytesRead == 0 && secureBytesRead > 0) {
-      // Deal with SSL handshake
-      if (self.server) {
-        self._checkForSecureHandshake();
-      } else {
-        if (self.secureEstablised) {
-          self.flush();
-        } else {
-          self._checkForSecureHandshake();
-        }
-      }
-    } else if (bytesRead === 0) {
+    if (bytesRead === 0) {
       self.readable = false;
       self._readWatcher.stop();
 
@@ -384,6 +513,7 @@ function initStream (self) {
   // Queue of buffers and string that need to be written to socket.
   self._writeQueue = [];
   self._writeQueueEncoding = [];
+  self._writeQueueFD = [];
 
   self._writeWatcher = ioWatchers.alloc();
   self._writeWatcher.socket = self;
@@ -391,14 +521,17 @@ function initStream (self) {
   self.writable = false;
 }
 
-function Stream (fd) {
+function Stream (fd, type) {
   events.EventEmitter.call(this);
 
   this.fd = null;
+  this.type = null;
   this.secure = false;
 
   if (parseInt(fd) >= 0) {
-    this.open(fd);
+    this.open(fd, type);
+  } else {
+    setImplmentationMethods(this);
   }
 };
 sys.inherits(Stream, events.EventEmitter);
@@ -423,6 +556,8 @@ Stream.prototype.setSecure = function(credentials) {
   }
   this.secureStream = new SecureStream(this.credentials.context, this.server ? 1 : 0, this.credentials.shouldVerify ? 1 : 0);
 
+  setImplmentationMethods(this);
+
   if (!this.server) {
     // If client, trigger handshake
     this._checkForSecureHandshake();
@@ -439,6 +574,10 @@ Stream.prototype.verifyPeer = function() {
 
 
 Stream.prototype._checkForSecureHandshake = function() {
+  if (!this.writable) {
+    return;
+  }
+
   // Do an empty write to see if we need to write out as part of handshake
   if (!emptyBuffer) allocEmptyBuffer();
   this.write(emptyBuffer);
@@ -461,13 +600,15 @@ Stream.prototype.getCipher = function() {
 }
 
 
-Stream.prototype.open = function (fd) {
+Stream.prototype.open = function (fd, type) {
   initStream(this);
 
   this.fd = fd;
-
+  this.type = type || null;
   this.readable = true;
 
+  setImplmentationMethods(this);
+
   this._writeWatcher.set(this.fd, false, true);
   this.writable = true;
 }
@@ -504,7 +645,9 @@ Object.defineProperty(Stream.prototype, 'readyState', {
 // Returns true if all the data was flushed to socket. Returns false if
 // something was queued. If data was queued, then the "drain" event will
 // signal when it has been finally flushed to socket.
-Stream.prototype.write = function (data, encoding) {
+//
+// XXX: Caller cannot close the given fd until the stream has drained.
+Stream.prototype.write = function (data, encoding, fd) {
   if (this._writeQueue && this._writeQueue.length) {
     // Slow. There is already a write queue, so let's append to it.
     if (this._writeQueueLast() === END_OF_FILE) {
@@ -519,26 +662,20 @@ Stream.prototype.write = function (data, encoding) {
       this._writeQueue.push(data);
       this._writeQueueEncoding.push(encoding);
     }
+
+    if (fd != undefined) {
+      this._writeQueueFD.push(fd);
+    }
+
     return false;
   } else {
     // Fast.
     // The most common case. There is no write queue. Just push the data
     // directly to the socket.
-    return this._writeOut(data, encoding);
+    return this._writeOut(data, encoding, fd);
   }
 };
 
-
-Stream.prototype._shutdownSecure = function () {
-  this.secureStream.shutdown();
-  if (!securePool) allocNewSecurePool();
-  var secureLen = this.secureStream.writeExtract(securePool, 0, securePool.length);
-  try {
-    var secureBytesWritten = write(this.fd, securePool, 0, secureLen);
-  } catch (e) {
-  }
-}
-
 // Directly writes the data to socket.
 //
 // Steps:
@@ -547,12 +684,10 @@ Stream.prototype._shutdownSecure = function () {
 //   2. Write data to socket. Return true if flushed.
 //   3. Slice out remaining
 //   4. Unshift remaining onto _writeQueue. Return false.
-Stream.prototype._writeOut = function (data, encoding) {
+Stream.prototype._writeOut = function (data, encoding, fd) {
   if (!this.writable) {
-    if (this.secure) return false;
-    else throw new Error('Stream is not writable');
+    throw new Error('Stream is not writable');
   }
-  if (!this.secure && data.length == 0) return true;
 
   var buffer, off, len;
   var bytesWritten, charsWritten;
@@ -581,7 +716,9 @@ Stream.prototype._writeOut = function (data, encoding) {
       charsWritten = bytesWritten;
     }
 
-    if (encoding) assert(bytesWritten > 0);
+    if (encoding && data.length > 0) {
+      assert(bytesWritten > 0);
+    }
 
     buffer = pool;
     len = bytesWritten;
@@ -602,30 +739,7 @@ Stream.prototype._writeOut = function (data, encoding) {
   }
 
   try {
-    if (this.secure) {
-      if (!buffer) return false;
-      bytesWritten = this.secureStream.writeInject(buffer, off, len);
-      if (!securePool) allocNewSecurePool();
-      var secureLen = this.secureStream.writeExtract(securePool, 0, securePool.length);
-      if (secureLen==-1) {
-        // Check our read again for secure handshake
-        this._readWatcher.callback();
-        secureBytesWritten = 0;
-      } else {
-        var secureBytesWritten = write(this.fd, securePool, 0, secureLen);
-      }
-      if (!this.secureEstablished && this.secureStream.isInitFinished()) {
-        this.secureEstablished = true;
-        try {
-          if (this._events && this._events['secure']) this.emit('secure');
-        } catch (e) {
-          this.destroy(e);
-          return;
-        }
-      }
-    } else {
-      bytesWritten = write(this.fd, buffer, off, len);
-    }
+    bytesWritten = this._writeImpl(buffer, off, len, fd, 0);
   } catch (e) {
     this.destroy(e);
     return false;
@@ -665,6 +779,11 @@ Stream.prototype._writeOut = function (data, encoding) {
   this._writeQueue.unshift(leftOver);
   this._writeQueueEncoding.unshift(null);
 
+  // If didn't successfully write any bytes, enqueue our fd and try again
+  if (!bytesWritten) {
+    this._writeQueueFD.unshift(fd);
+  }
+
   return false;
 }
 
@@ -675,13 +794,14 @@ Stream.prototype.flush = function () {
   while (this._writeQueue && this._writeQueue.length) {
     var data = this._writeQueue.shift();
     var encoding = this._writeQueueEncoding.shift();
+    var fd = this._writeQueueFD.shift();
 
     if (data === END_OF_FILE) {
       this._shutdown();
       return true;
     }
 
-    var flushed = this._writeOut(data,encoding);
+    var flushed = this._writeOut(data,encoding,fd);
     if (!flushed) return false;
   }
   if (this._writeWatcher) this._writeWatcher.stop();
@@ -788,6 +908,8 @@ Stream.prototype.connect = function () {
     // UNIX
     self.fd = socket('unix');
     self.type = 'unix';
+
+    setImplmentationMethods(this);
     doConnect(self, arguments[0]);
   }
 };
@@ -890,14 +1012,10 @@ Stream.prototype._shutdown = function () {
       // readable and writable
       this.writable = false;
 
-      if (this.secure) {
-        this._shutdownSecure();
-      }
       try {
-        shutdown(this.fd, 'write')
+        this._shutdownImpl();
       } catch (e) {
         this.destroy(e);
-        return;
       }
     } else {
       // writable but not readable
@@ -951,7 +1069,7 @@ function Server (listener) {
       }
       if (!peerInfo) return;
 
-      var s = new Stream(peerInfo.fd);
+      var s = new Stream(peerInfo.fd, self.type);
       s.remoteAddress = peerInfo.address;
       s.remotePort = peerInfo.port;
       s.type = self.type;
@@ -1078,3 +1196,5 @@ Server.prototype.close = function () {
     self.emit("close");
   }
 };
+
+// vim:ts=2 sw=2
index 57d0d97e98971a03525ebe0f3fbaffe326c2197f..e414c876bb1f8baa6d97b1971e6fb9cbb0e6d8b3 100644 (file)
@@ -554,8 +554,6 @@ static Handle<Value> RecvMsg(const Arguments& args) {
           String::New("Length is extends beyond buffer")));
   }
 
-  int received_fd;
-
   struct iovec iov[1];
   iov[0].iov_base = (char*)buffer->data() + off;
   iov[0].iov_len = len;
@@ -583,15 +581,36 @@ static Handle<Value> RecvMsg(const Arguments& args) {
   // that the wrapper can pick up. Since we're single threaded, this is not
   // a problem - just make sure to copy out that variable before the next
   // call to recvmsg().
+  //
+  // XXX: Some implementations can send multiple file descriptors in a
+  //      single message. We should be using CMSG_NXTHDR() to walk the
+  //      chain to get at them all. This would require changing the 
+  //      API to hand these back up the caller, is a pain.
+
+  int received_fd = -1;
+  for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
+       msg.msg_controllen > 0 && cmsg != NULL;
+       cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+    if (cmsg->cmsg_type == SCM_RIGHTS) {
+      if (received_fd != -1) {
+        fprintf(stderr, "ignoring extra FD received: %d\n", received_fd);
+      }
 
-  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
-  if (cmsg && cmsg->cmsg_type == SCM_RIGHTS) {
-    received_fd = *(int *) CMSG_DATA(cmsg);
-    recv_msg_template->GetFunction()->Set(fd_symbol, Integer::New(received_fd));
-  } else {
-    recv_msg_template->GetFunction()->Set(fd_symbol, Null());
+      received_fd = *(int *) CMSG_DATA(cmsg);
+    } else {
+      fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
+        cmsg->cmsg_type
+      );
+    }
   }
 
+  recv_msg_template->GetFunction()->Set(
+    fd_symbol,
+    (received_fd != -1) ?
+      Integer::New(received_fd) :
+      Null()
+  );
+
   return scope.Close(Integer::New(bytes_read));
 }
 
@@ -640,49 +659,115 @@ static Handle<Value> Write(const Arguments& args) {
 }
 
 
-// var bytesWritten = t.sendFD(self.fd)
-//  returns null on EAGAIN or EINTR, raises an exception on all other errors
-static Handle<Value> SendFD(const Arguments& args) {
+// var bytes = sendmsg(fd, buf, off, len, fd, flags);
+//
+// Write a buffer with optional offset and length to the given file
+// descriptor. Note that we refuse to send 0 bytes.
+//
+// The 'fd' parameter is a numerical file descriptor, or the undefined value
+// to send none.
+//
+// The 'flags' parameter is a number representing a bitmask of MSG_* values.
+// This is passed directly to sendmsg().
+//
+// Returns null on EAGAIN or EINTR, raises an exception on all other errors
+static Handle<Value> SendMsg(const Arguments& args) {
   HandleScope scope;
 
+  struct iovec iov;
+
   if (args.Length() < 2) {
     return ThrowException(Exception::TypeError(
           String::New("Takes 2 parameters")));
   }
 
+  // The first argument should be a file descriptor
   FD_ARG(args[0])
 
-  // TODO: make sure fd is a unix domain socket?
-
-  if (!args[1]->IsInt32()) {
+  // Grab the actul data to be written, stuffing it into iov
+  if (!Buffer::HasInstance(args[1])) {
     return ThrowException(Exception::TypeError(
-          String::New("FD to send is not an integer")));
+      String::New("Expected either a string or a buffer")));
   }
 
-  int fd_to_send = args[1]->Int32Value();
+  Buffer *buf = ObjectWrap::Unwrap<Buffer>(args[1]->ToObject());
+
+  size_t offset = 0;
+  if (args.Length() >= 3 && !args[2]->IsUndefined()) {
+    if (!args[2]->IsUint32()) {
+      return ThrowException(Exception::TypeError(
+        String::New("Expected unsigned integer for offset")));
+    }
+
+    offset = args[2]->Uint32Value();
+    if (offset >= buf->length()) {
+      return ThrowException(Exception::Error(
+        String::New("Offset into buffer too large")));
+    }
+  }
+
+  size_t length = buf->length() - offset;
+  if (args.Length() >= 4 && !args[3]->IsUndefined()) {
+    if (!args[3]->IsUint32()) {
+      return ThrowException(Exception::TypeError(
+        String::New("Expected unsigned integer for length")));
+    }
+
+    length = args[3]->Uint32Value();
+    if (offset + length > buf->length()) {
+      return ThrowException(Exception::Error(
+        String::New("offset + length beyond buffer length")));
+    }
+  }
+
+  iov.iov_base = buf->data() + offset;
+  iov.iov_len = length;
+
+  int fd_to_send = -1;
+  if (args.Length() >= 5 && !args[4]->IsUndefined()) {
+    if (!args[4]->IsUint32()) {
+      return ThrowException(Exception::TypeError(
+        String::New("Expected unsigned integer for a file descriptor")));
+    }
+
+    fd_to_send = args[4]->Uint32Value();
+  }
+
+  int flags = 0;
+  if (args.Length() >= 6 && !args[5]->IsUndefined()) {
+    if (!args[5]->IsUint32()) {
+      return ThrowException(Exception::TypeError(
+        String::New("Expected unsigned integer for a flags argument")));
+    }
+
+    flags = args[5]->Uint32Value();
+  }
 
   struct msghdr msg;
-  struct iovec iov[1];
-  char control_msg[CMSG_SPACE(sizeof(fd_to_send))];
-  struct cmsghdr *cmsg;
-  static char dummy = 'd'; // Need to send at least a byte of data in the message
+  char scratch[64];
 
-  iov[0].iov_base = &dummy;
-  iov[0].iov_len = 1;
-  msg.msg_iov = iov;
+  msg.msg_iov = &iov;
   msg.msg_iovlen = 1;
   msg.msg_name = NULL;
   msg.msg_namelen = 0;
   msg.msg_flags = 0;
-  msg.msg_control = (void *) control_msg;
-  msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
-  cmsg = CMSG_FIRSTHDR(&msg);
-  cmsg->cmsg_level = SOL_SOCKET;
-  cmsg->cmsg_type = SCM_RIGHTS;
-  cmsg->cmsg_len = msg.msg_controllen;
-  *(int*) CMSG_DATA(cmsg) = fd_to_send;
+  msg.msg_control = NULL;
+  msg.msg_controllen = 0;
+
+  if (fd_to_send >= 0) {
+    struct cmsghdr *cmsg;
+
+    msg.msg_control = (void *) scratch;
+    msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
+
+    cmsg = CMSG_FIRSTHDR(&msg);
+    cmsg->cmsg_level = SOL_SOCKET;
+    cmsg->cmsg_type = SCM_RIGHTS;
+    cmsg->cmsg_len = msg.msg_controllen;
+    *(int*) CMSG_DATA(cmsg) = fd_to_send;
+  }
 
-  ssize_t written = sendmsg(fd, &msg, 0);
+  ssize_t written = sendmsg(fd, &msg, flags);
 
   if (written < 0) {
     if (errno == EAGAIN || errno == EINTR) return Null();
@@ -805,7 +890,7 @@ void InitNet(Handle<Object> target) {
   NODE_SET_METHOD(target, "write", Write);
   NODE_SET_METHOD(target, "read", Read);
 
-  NODE_SET_METHOD(target, "sendFD", SendFD);
+  NODE_SET_METHOD(target, "sendMsg", SendMsg);
 
   recv_msg_template =
       Persistent<FunctionTemplate>::New(FunctionTemplate::New(RecvMsg));
diff --git a/test/fixtures/recvfd.js b/test/fixtures/recvfd.js
new file mode 100644 (file)
index 0000000..27e8fc6
--- /dev/null
@@ -0,0 +1,57 @@
+// See test/simple/test-sendfd.js for a complete description of what this
+// script is doing and how it fits into the test as a whole.
+
+var net = require('net');
+var sys = require('sys');
+
+var receivedData = [];
+var receivedFDs = [];
+var numSentMessages = 0;
+
+function processData(s) {
+  if (receivedData.length == 0 || receivedFDs.length == 0) {
+    return;
+  }
+
+  var fd = receivedFDs.shift();
+  var d = receivedData.shift();
+
+  // Augment our received object before sending it back across the pipe.
+  d.pid = process.pid;
+
+  // Create a stream around the FD that we received and send a serialized
+  // version of our modified object back. Clean up when we're done.
+  var pipeStream = new net.Stream(fd);
+
+  var drainFunc = function() {
+    pipeStream.destroy();
+
+    if (++numSentMessages == 2) {
+      s.destroy();
+    }
+  };
+
+  pipeStream.addListener('drain', drainFunc);
+  pipeStream.resume();
+
+  if (pipeStream.write(JSON.stringify(d) + '\n')) {
+    drainFunc();
+  }
+};
+
+// Create a UNIX socket to the path defined by argv[2] and read a file
+// descriptor and misc data from it.
+var s = new net.Stream();
+s.addListener('fd', function(fd) {
+  receivedFDs.unshift(fd);
+  processData(s);
+});
+s.addListener('data', function(data) {
+  data.toString('utf8').trim().split('\n').forEach(function(d) {
+    receivedData.unshift(JSON.parse(d));
+  });
+  processData(s);
+});
+s.connect(process.argv[2]);
+
+// vim:ts=2 sw=2 et
diff --git a/test/simple/test-sendfd.js b/test/simple/test-sendfd.js
new file mode 100644 (file)
index 0000000..9c5d731
--- /dev/null
@@ -0,0 +1,126 @@
+// Test sending and receiving a file descriptor.
+//
+// This test is pretty complex. It ends up spawning test/fixtures/recvfd.js
+// as a child to test desired behavior. What happens is
+//
+//  1. Create an in-memory pipe via pipe(2). These two file descriptors
+//     are not visible to any other process, and so make a good test-case
+//     for sharing.
+//  2. Create a a UNIX socket at SOCK_PATH. When a client connects to this
+//     path, they are sent the write end of the pipe from above.
+//  3. The client is sent n JSON representations of the DATA variable, each
+//     with a different ordinal. We send these delimited by '\n' strings
+//     so that the receiving end can avoid any coalescing that hapepns
+//     due to the stream nature of the socket (e.g. '{}{}' is not a valid
+//     JSON string).
+//  4. The child process receives file descriptors and JSON blobs and,
+//     whenever it has at least one of each, writes a modified JSON blob
+//     to the FD. The blob is modified to include the child's process ID.
+//  5. Once the child process has sent n responses, it closes the write end
+//     of the pipe, which signals to the parent that there is no more data
+//     coming.
+//  6. The parent listens to the read end of the pipe, accumulating JSON
+//     blobs (again, delimited by '\n') and verifying that a) the 'pid'
+//     attribute belongs to the child and b) the 'ord' field has not been
+//     seen in a response yet. This is intended to ensure that all blobs
+//     sent out have been relayed back to us.
+
+require('../common');
+
+var buffer = require('buffer');
+var child_process = require('child_process');
+var fs = require('fs');
+var net = require('net');
+var netBinding = process.binding('net');
+var path = require('path');
+var sys = require('sys');
+
+var DATA = {
+  'ppid' : process.pid,
+  'ord' : 0
+};
+
+var SOCK_PATH = path.join(
+  __dirname,
+  '..',
+  path.basename(__filename, '.js') + '.sock'
+);
+
+var logChild = function(d) {
+  if (typeof d == 'object') {
+    d = d.toString();
+  }
+
+  d.split('\n').forEach(function(l) {
+    if (l.length > 0) {
+      sys.debug('CHILD: ' + l);
+    }
+  });
+};
+
+// Create a pipe
+// 
+// We establish a listener on the read end of the pipe so that we can
+// validate any data sent back by the child. We send the write end of the
+// pipe to the child and close it off in our process.
+var pipeFDs = netBinding.pipe();
+assert.equal(pipeFDs.length, 2);
+
+var seenOrdinals = [];
+
+var pipeReadStream = new net.Stream();
+pipeReadStream.addListener('data', function(data) {
+  data.toString('utf8').trim().split('\n').forEach(function(d) {
+    var rd = JSON.parse(d);
+
+    assert.equal(rd.pid, cpp);
+    assert.equal(seenOrdinals.indexOf(rd.ord), -1);
+
+    seenOrdinals.unshift(rd.ord);
+  });
+});
+pipeReadStream.open(pipeFDs[0]);
+pipeReadStream.resume();
+
+// Create a UNIX socket at SOCK_PATH and send DATA and the write end
+// of the pipe to whoever connects.
+//
+// We send two messages here, both with the same pipe FD: one string, and
+// one buffer. We want to make sure that both datatypes are handled
+// correctly.
+var srv = net.createServer(function(s) {
+  var str = JSON.stringify(DATA) + '\n';
+
+  DATA.ord = DATA.ord + 1;
+  var buf = new buffer.Buffer(str.length);
+  buf.write(JSON.stringify(DATA) + '\n', 'utf8');
+
+  s.write(str, 'utf8', pipeFDs[1]);
+  if (s.write(buf, undefined, pipeFDs[1])) {
+    netBinding.close(pipeFDs[1]);
+  } else {
+    s.addListener('drain', function() {
+      netBinding.close(pipeFDs[1]);
+    });
+  }
+});
+srv.listen(SOCK_PATH);
+
+// Spawn a child running test/fixtures/recvfd.js
+var cp = child_process.spawn(process.argv[0],
+                             [path.join(fixturesDir, 'recvfd.js'), SOCK_PATH]);
+
+cp.stdout.addListener('data', logChild);
+cp.stderr.addListener('data', logChild);
+
+// When the child exits, clean up and validate its exit status
+var cpp = cp.pid;
+cp.addListener('exit', function(code, signal) {
+  srv.close();
+  // fs.unlinkSync(SOCK_PATH);
+
+  assert.equal(code, 0);
+  assert.equal(seenOrdinals.length, 2);
+});
+
+// vim:ts=2 sw=2 et