var socketError = binding.socketError;
var getsockname = binding.getsockname;
var errnoException = binding.errnoException;
+var sendMsg = binding.sendMsg;
+var recvMsg = binding.recvMsg;
var EINPROGRESS = binding.EINPROGRESS;
var ENOENT = binding.ENOENT;
var EMFILE = binding.EMFILE;
}
}
+function setImplmentationMethods (self) {
+ function noData(buf, off, len) {
+ return !buf ||
+ (off != undefined && off >= buf.length) ||
+ (len == 0);
+ };
+
+ if (self.type == 'unix') {
+ self._writeImpl = function(buf, off, len, fd, flags) {
+ // Detect and disallow zero-byte writes wth an attached file
+ // descriptor. This is an implementation limitation of sendmsg(2).
+ if (fd && noData(buf, off, len)) {
+ throw new Error('File descriptors can only be written with data');
+ }
+
+ return sendMsg(self.fd, buf, off, len, fd, flags);
+ };
+
+ self._readImpl = function(buf, off, len, calledByIOWatcher) {
+ var bytesRead = recvMsg(self.fd, buf, off, len);
+
+ // Do not emit this in the same stack, otherwise we risk corrupting
+ // our buffer pool which is full of read data, but has not had
+ // had its pointers updated just yet.
+ if (recvMsg.fd !== null) {
+ process.nextTick(function() {
+ self.emit('fd', recvMsg.fd);
+ });
+ }
+
+ return bytesRead;
+ };
+ } else {
+ self._writeImpl = function(buf, off, len, fd, flags) {
+ // XXX: TLS support requires that 0-byte writes get processed
+ // by the kernel for some reason. Otherwise, we'd just
+ // fast-path return here.
+
+ return write(self.fd, buf, off, len, fd, flags);
+ };
+
+ self._readImpl = function(buf, off, len, calledByIOWatcher) {
+ return read(self.fd, buf, off, len);
+ };
+ }
+
+ self._shutdownImpl = function() {
+ shutdown(self.fd, 'write')
+ };
+
+ if (self.secure) {
+ var oldWrite = self._writeImpl;
+ self._writeImpl = function(buf, off, len, fd, flags) {
+ assert(buf);
+ assert(self.secure);
+
+ var bytesWritten = self.secureStream.writeInject(buf, off, len);
+
+ if (!securePool) {
+ allocNewSecurePool();
+ }
+
+ var secureLen = self.secureStream.writeExtract(
+ securePool, 0, securePool.length
+ );
+
+ if (secureLen == -1) {
+ // Check our read again for secure handshake
+ self._readWatcher.callback();
+ } else {
+ oldWrite(securePool, 0, secureLen, fd, flags);
+ }
+
+ if (!self.secureEstablished && self.secureStream.isInitFinished()) {
+ self.secureEstablished = true;
+
+ if (self._events && self._events['secure']) {
+ self.emit('secure');
+ }
+ }
+
+ return bytesWritten;
+ };
+
+ var oldRead = self._readImpl;
+ self._readImpl = function(buf, off, len, calledByIOWatcher) {
+ assert(self.secure);
+
+ var bytesRead = 0;
+ var secureBytesRead = null;
+
+ if (!securePool) {
+ allocNewSecurePool();
+ }
+
+ if (calledByIOWatcher) {
+ secureBytesRead = oldRead(securePool, 0, securePool.length);
+ self.secureStream.readInject(securePool, 0, secureBytesRead);
+ }
+
+ var chunkBytes;
+ do {
+ chunkBytes = self.secureStream.readExtract(
+ pool,
+ pool.used + bytesRead,
+ pool.length - pool.used - bytesRead
+ );
+
+ bytesRead += chunkBytes;
+ } while ((chunkBytes > 0) && (pool.used + bytesRead < pool.length));
+
+ if (bytesRead == 0 && !calledByIOWatcher) {
+ return -1;
+ }
+
+ if (self.secureStream.readPending()) {
+ process.nextTick(function () {
+ if(self._readWatcher)
+ self._readWatcher.callback();
+ });
+ }
+
+ if (!self.secureEstablished) {
+ if (self.secureStream.isInitFinished()) {
+ self.secureEstablished = true;
+ if (self._events && self._events['secure']) {
+ self.emit('secure');
+ }
+ }
+ }
+
+ if (calledByIOWatcher && secureBytesRead === null && !self.server) {
+ // Client needs to write as part of handshake
+ self._writeWatcher.start();
+ return -1;
+ }
+
+ if (bytesRead == 0 && secureBytesRead > 0) {
+ // Deal with SSL handshake
+ if (self.server) {
+ self._checkForSecureHandshake();
+ } else {
+ if (self.secureEstablised) {
+ self.flush();
+ } else {
+ self._checkForSecureHandshake();
+ }
+ }
+
+ return -1;
+ }
+
+ return bytesRead;
+ };
+
+ var oldShutdown = self._shutdownImpl;
+ self._shutdownImpl = function() {
+ self.secureStream.shutdown();
+
+ if (!securePool) {
+ allocNewSecurePool();
+ }
+
+ var secureLen = self.secureStream.writeExtract(
+ securePool, 0, securePool.length
+ );
+
+ try {
+ oldWrite(securePool, 0, secureLen);
+ } catch (e) { }
+
+ oldShutdown();
+ };
+ }
+};
+
function initStream (self) {
self._readWatcher = ioWatchers.alloc();
self._readWatcher.callback = function () {
//debug('pool.used ' + pool.used);
var bytesRead;
- var secureBytesRead;
try {
- if (self.secure) {
- if (!securePool) allocNewSecurePool();
- var calledByNextTick = (arguments.length == 0); // IOWatcher always passes arguments
- if (!calledByNextTick) {
- secureBytesRead = read(self.fd, securePool, 0, securePool.length);
- self.secureStream.readInject(securePool, 0, secureBytesRead);
- }
- var chunkBytes;
- bytesRead = 0;
- do {
- chunkBytes = self.secureStream.readExtract(pool,
- pool.used + bytesRead,
- pool.length - pool.used - bytesRead);
- bytesRead += chunkBytes;
- } while ((chunkBytes > 0) && (pool.used + bytesRead < pool.length));
- if (bytesRead == 0 && calledByNextTick)
- return;
- if (self.secureStream.readPending()) {
- process.nextTick(function () {
- if(self._readWatcher)
- self._readWatcher.callback();
- });
- }
- if (!self.secureEstablished) {
- if (self.secureStream.isInitFinished()) {
- self.secureEstablished = true;
- if (self._events && self._events['secure']) self.emit('secure');
- }
- }
- if (secureBytesRead === null && !self.server) {
- // Client needs to write as part of handshake
- self._writeWatcher.start();
- return;
- }
- } else {
- bytesRead = read(self.fd,
- pool,
- pool.used,
- pool.length - pool.used);
- }
+ bytesRead = self._readImpl(pool, pool.used, pool.length - pool.used, (arguments.length > 0));
} catch (e) {
self.destroy(e);
return;
}
- //debug('bytesRead ' + bytesRead + '\n');
+ // Note that some _readImpl() implementations return -1 bytes
+ // read as an indication not to do any processing on the result
+ // (but not an error).
- if (self.secure && bytesRead == 0 && secureBytesRead > 0) {
- // Deal with SSL handshake
- if (self.server) {
- self._checkForSecureHandshake();
- } else {
- if (self.secureEstablised) {
- self.flush();
- } else {
- self._checkForSecureHandshake();
- }
- }
- } else if (bytesRead === 0) {
+ if (bytesRead === 0) {
self.readable = false;
self._readWatcher.stop();
// Queue of buffers and string that need to be written to socket.
self._writeQueue = [];
self._writeQueueEncoding = [];
+ self._writeQueueFD = [];
self._writeWatcher = ioWatchers.alloc();
self._writeWatcher.socket = self;
self.writable = false;
}
-function Stream (fd) {
+function Stream (fd, type) {
events.EventEmitter.call(this);
this.fd = null;
+ this.type = null;
this.secure = false;
if (parseInt(fd) >= 0) {
- this.open(fd);
+ this.open(fd, type);
+ } else {
+ setImplmentationMethods(this);
}
};
sys.inherits(Stream, events.EventEmitter);
}
this.secureStream = new SecureStream(this.credentials.context, this.server ? 1 : 0, this.credentials.shouldVerify ? 1 : 0);
+ setImplmentationMethods(this);
+
if (!this.server) {
// If client, trigger handshake
this._checkForSecureHandshake();
Stream.prototype._checkForSecureHandshake = function() {
+ if (!this.writable) {
+ return;
+ }
+
// Do an empty write to see if we need to write out as part of handshake
if (!emptyBuffer) allocEmptyBuffer();
this.write(emptyBuffer);
}
-Stream.prototype.open = function (fd) {
+Stream.prototype.open = function (fd, type) {
initStream(this);
this.fd = fd;
-
+ this.type = type || null;
this.readable = true;
+ setImplmentationMethods(this);
+
this._writeWatcher.set(this.fd, false, true);
this.writable = true;
}
// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket.
-Stream.prototype.write = function (data, encoding) {
+//
+// XXX: Caller cannot close the given fd until the stream has drained.
+Stream.prototype.write = function (data, encoding, fd) {
if (this._writeQueue && this._writeQueue.length) {
// Slow. There is already a write queue, so let's append to it.
if (this._writeQueueLast() === END_OF_FILE) {
this._writeQueue.push(data);
this._writeQueueEncoding.push(encoding);
}
+
+ if (fd != undefined) {
+ this._writeQueueFD.push(fd);
+ }
+
return false;
} else {
// Fast.
// The most common case. There is no write queue. Just push the data
// directly to the socket.
- return this._writeOut(data, encoding);
+ return this._writeOut(data, encoding, fd);
}
};
-
-Stream.prototype._shutdownSecure = function () {
- this.secureStream.shutdown();
- if (!securePool) allocNewSecurePool();
- var secureLen = this.secureStream.writeExtract(securePool, 0, securePool.length);
- try {
- var secureBytesWritten = write(this.fd, securePool, 0, secureLen);
- } catch (e) {
- }
-}
-
// Directly writes the data to socket.
//
// Steps:
// 2. Write data to socket. Return true if flushed.
// 3. Slice out remaining
// 4. Unshift remaining onto _writeQueue. Return false.
-Stream.prototype._writeOut = function (data, encoding) {
+Stream.prototype._writeOut = function (data, encoding, fd) {
if (!this.writable) {
- if (this.secure) return false;
- else throw new Error('Stream is not writable');
+ throw new Error('Stream is not writable');
}
- if (!this.secure && data.length == 0) return true;
var buffer, off, len;
var bytesWritten, charsWritten;
charsWritten = bytesWritten;
}
- if (encoding) assert(bytesWritten > 0);
+ if (encoding && data.length > 0) {
+ assert(bytesWritten > 0);
+ }
buffer = pool;
len = bytesWritten;
}
try {
- if (this.secure) {
- if (!buffer) return false;
- bytesWritten = this.secureStream.writeInject(buffer, off, len);
- if (!securePool) allocNewSecurePool();
- var secureLen = this.secureStream.writeExtract(securePool, 0, securePool.length);
- if (secureLen==-1) {
- // Check our read again for secure handshake
- this._readWatcher.callback();
- secureBytesWritten = 0;
- } else {
- var secureBytesWritten = write(this.fd, securePool, 0, secureLen);
- }
- if (!this.secureEstablished && this.secureStream.isInitFinished()) {
- this.secureEstablished = true;
- try {
- if (this._events && this._events['secure']) this.emit('secure');
- } catch (e) {
- this.destroy(e);
- return;
- }
- }
- } else {
- bytesWritten = write(this.fd, buffer, off, len);
- }
+ bytesWritten = this._writeImpl(buffer, off, len, fd, 0);
} catch (e) {
this.destroy(e);
return false;
this._writeQueue.unshift(leftOver);
this._writeQueueEncoding.unshift(null);
+ // If didn't successfully write any bytes, enqueue our fd and try again
+ if (!bytesWritten) {
+ this._writeQueueFD.unshift(fd);
+ }
+
return false;
}
while (this._writeQueue && this._writeQueue.length) {
var data = this._writeQueue.shift();
var encoding = this._writeQueueEncoding.shift();
+ var fd = this._writeQueueFD.shift();
if (data === END_OF_FILE) {
this._shutdown();
return true;
}
- var flushed = this._writeOut(data,encoding);
+ var flushed = this._writeOut(data,encoding,fd);
if (!flushed) return false;
}
if (this._writeWatcher) this._writeWatcher.stop();
// UNIX
self.fd = socket('unix');
self.type = 'unix';
+
+ setImplmentationMethods(this);
doConnect(self, arguments[0]);
}
};
// readable and writable
this.writable = false;
- if (this.secure) {
- this._shutdownSecure();
- }
try {
- shutdown(this.fd, 'write')
+ this._shutdownImpl();
} catch (e) {
this.destroy(e);
- return;
}
} else {
// writable but not readable
}
if (!peerInfo) return;
- var s = new Stream(peerInfo.fd);
+ var s = new Stream(peerInfo.fd, self.type);
s.remoteAddress = peerInfo.address;
s.remotePort = peerInfo.port;
s.type = self.type;
self.emit("close");
}
};
+
+// vim:ts=2 sw=2
String::New("Length is extends beyond buffer")));
}
- int received_fd;
-
struct iovec iov[1];
iov[0].iov_base = (char*)buffer->data() + off;
iov[0].iov_len = len;
// that the wrapper can pick up. Since we're single threaded, this is not
// a problem - just make sure to copy out that variable before the next
// call to recvmsg().
+ //
+ // XXX: Some implementations can send multiple file descriptors in a
+ // single message. We should be using CMSG_NXTHDR() to walk the
+ // chain to get at them all. This would require changing the
+ // API to hand these back up the caller, is a pain.
+
+ int received_fd = -1;
+ for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
+ msg.msg_controllen > 0 && cmsg != NULL;
+ cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_type == SCM_RIGHTS) {
+ if (received_fd != -1) {
+ fprintf(stderr, "ignoring extra FD received: %d\n", received_fd);
+ }
- struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
- if (cmsg && cmsg->cmsg_type == SCM_RIGHTS) {
- received_fd = *(int *) CMSG_DATA(cmsg);
- recv_msg_template->GetFunction()->Set(fd_symbol, Integer::New(received_fd));
- } else {
- recv_msg_template->GetFunction()->Set(fd_symbol, Null());
+ received_fd = *(int *) CMSG_DATA(cmsg);
+ } else {
+ fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
+ cmsg->cmsg_type
+ );
+ }
}
+ recv_msg_template->GetFunction()->Set(
+ fd_symbol,
+ (received_fd != -1) ?
+ Integer::New(received_fd) :
+ Null()
+ );
+
return scope.Close(Integer::New(bytes_read));
}
}
-// var bytesWritten = t.sendFD(self.fd)
-// returns null on EAGAIN or EINTR, raises an exception on all other errors
-static Handle<Value> SendFD(const Arguments& args) {
+// var bytes = sendmsg(fd, buf, off, len, fd, flags);
+//
+// Write a buffer with optional offset and length to the given file
+// descriptor. Note that we refuse to send 0 bytes.
+//
+// The 'fd' parameter is a numerical file descriptor, or the undefined value
+// to send none.
+//
+// The 'flags' parameter is a number representing a bitmask of MSG_* values.
+// This is passed directly to sendmsg().
+//
+// Returns null on EAGAIN or EINTR, raises an exception on all other errors
+static Handle<Value> SendMsg(const Arguments& args) {
HandleScope scope;
+ struct iovec iov;
+
if (args.Length() < 2) {
return ThrowException(Exception::TypeError(
String::New("Takes 2 parameters")));
}
+ // The first argument should be a file descriptor
FD_ARG(args[0])
- // TODO: make sure fd is a unix domain socket?
-
- if (!args[1]->IsInt32()) {
+ // Grab the actul data to be written, stuffing it into iov
+ if (!Buffer::HasInstance(args[1])) {
return ThrowException(Exception::TypeError(
- String::New("FD to send is not an integer")));
+ String::New("Expected either a string or a buffer")));
}
- int fd_to_send = args[1]->Int32Value();
+ Buffer *buf = ObjectWrap::Unwrap<Buffer>(args[1]->ToObject());
+
+ size_t offset = 0;
+ if (args.Length() >= 3 && !args[2]->IsUndefined()) {
+ if (!args[2]->IsUint32()) {
+ return ThrowException(Exception::TypeError(
+ String::New("Expected unsigned integer for offset")));
+ }
+
+ offset = args[2]->Uint32Value();
+ if (offset >= buf->length()) {
+ return ThrowException(Exception::Error(
+ String::New("Offset into buffer too large")));
+ }
+ }
+
+ size_t length = buf->length() - offset;
+ if (args.Length() >= 4 && !args[3]->IsUndefined()) {
+ if (!args[3]->IsUint32()) {
+ return ThrowException(Exception::TypeError(
+ String::New("Expected unsigned integer for length")));
+ }
+
+ length = args[3]->Uint32Value();
+ if (offset + length > buf->length()) {
+ return ThrowException(Exception::Error(
+ String::New("offset + length beyond buffer length")));
+ }
+ }
+
+ iov.iov_base = buf->data() + offset;
+ iov.iov_len = length;
+
+ int fd_to_send = -1;
+ if (args.Length() >= 5 && !args[4]->IsUndefined()) {
+ if (!args[4]->IsUint32()) {
+ return ThrowException(Exception::TypeError(
+ String::New("Expected unsigned integer for a file descriptor")));
+ }
+
+ fd_to_send = args[4]->Uint32Value();
+ }
+
+ int flags = 0;
+ if (args.Length() >= 6 && !args[5]->IsUndefined()) {
+ if (!args[5]->IsUint32()) {
+ return ThrowException(Exception::TypeError(
+ String::New("Expected unsigned integer for a flags argument")));
+ }
+
+ flags = args[5]->Uint32Value();
+ }
struct msghdr msg;
- struct iovec iov[1];
- char control_msg[CMSG_SPACE(sizeof(fd_to_send))];
- struct cmsghdr *cmsg;
- static char dummy = 'd'; // Need to send at least a byte of data in the message
+ char scratch[64];
- iov[0].iov_base = &dummy;
- iov[0].iov_len = 1;
- msg.msg_iov = iov;
+ msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_flags = 0;
- msg.msg_control = (void *) control_msg;
- msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
- cmsg = CMSG_FIRSTHDR(&msg);
- cmsg->cmsg_level = SOL_SOCKET;
- cmsg->cmsg_type = SCM_RIGHTS;
- cmsg->cmsg_len = msg.msg_controllen;
- *(int*) CMSG_DATA(cmsg) = fd_to_send;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+
+ if (fd_to_send >= 0) {
+ struct cmsghdr *cmsg;
+
+ msg.msg_control = (void *) scratch;
+ msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = msg.msg_controllen;
+ *(int*) CMSG_DATA(cmsg) = fd_to_send;
+ }
- ssize_t written = sendmsg(fd, &msg, 0);
+ ssize_t written = sendmsg(fd, &msg, flags);
if (written < 0) {
if (errno == EAGAIN || errno == EINTR) return Null();
NODE_SET_METHOD(target, "write", Write);
NODE_SET_METHOD(target, "read", Read);
- NODE_SET_METHOD(target, "sendFD", SendFD);
+ NODE_SET_METHOD(target, "sendMsg", SendMsg);
recv_msg_template =
Persistent<FunctionTemplate>::New(FunctionTemplate::New(RecvMsg));
--- /dev/null
+// Test sending and receiving a file descriptor.
+//
+// This test is pretty complex. It ends up spawning test/fixtures/recvfd.js
+// as a child to test desired behavior. What happens is
+//
+// 1. Create an in-memory pipe via pipe(2). These two file descriptors
+// are not visible to any other process, and so make a good test-case
+// for sharing.
+// 2. Create a a UNIX socket at SOCK_PATH. When a client connects to this
+// path, they are sent the write end of the pipe from above.
+// 3. The client is sent n JSON representations of the DATA variable, each
+// with a different ordinal. We send these delimited by '\n' strings
+// so that the receiving end can avoid any coalescing that hapepns
+// due to the stream nature of the socket (e.g. '{}{}' is not a valid
+// JSON string).
+// 4. The child process receives file descriptors and JSON blobs and,
+// whenever it has at least one of each, writes a modified JSON blob
+// to the FD. The blob is modified to include the child's process ID.
+// 5. Once the child process has sent n responses, it closes the write end
+// of the pipe, which signals to the parent that there is no more data
+// coming.
+// 6. The parent listens to the read end of the pipe, accumulating JSON
+// blobs (again, delimited by '\n') and verifying that a) the 'pid'
+// attribute belongs to the child and b) the 'ord' field has not been
+// seen in a response yet. This is intended to ensure that all blobs
+// sent out have been relayed back to us.
+
+require('../common');
+
+var buffer = require('buffer');
+var child_process = require('child_process');
+var fs = require('fs');
+var net = require('net');
+var netBinding = process.binding('net');
+var path = require('path');
+var sys = require('sys');
+
+var DATA = {
+ 'ppid' : process.pid,
+ 'ord' : 0
+};
+
+var SOCK_PATH = path.join(
+ __dirname,
+ '..',
+ path.basename(__filename, '.js') + '.sock'
+);
+
+var logChild = function(d) {
+ if (typeof d == 'object') {
+ d = d.toString();
+ }
+
+ d.split('\n').forEach(function(l) {
+ if (l.length > 0) {
+ sys.debug('CHILD: ' + l);
+ }
+ });
+};
+
+// Create a pipe
+//
+// We establish a listener on the read end of the pipe so that we can
+// validate any data sent back by the child. We send the write end of the
+// pipe to the child and close it off in our process.
+var pipeFDs = netBinding.pipe();
+assert.equal(pipeFDs.length, 2);
+
+var seenOrdinals = [];
+
+var pipeReadStream = new net.Stream();
+pipeReadStream.addListener('data', function(data) {
+ data.toString('utf8').trim().split('\n').forEach(function(d) {
+ var rd = JSON.parse(d);
+
+ assert.equal(rd.pid, cpp);
+ assert.equal(seenOrdinals.indexOf(rd.ord), -1);
+
+ seenOrdinals.unshift(rd.ord);
+ });
+});
+pipeReadStream.open(pipeFDs[0]);
+pipeReadStream.resume();
+
+// Create a UNIX socket at SOCK_PATH and send DATA and the write end
+// of the pipe to whoever connects.
+//
+// We send two messages here, both with the same pipe FD: one string, and
+// one buffer. We want to make sure that both datatypes are handled
+// correctly.
+var srv = net.createServer(function(s) {
+ var str = JSON.stringify(DATA) + '\n';
+
+ DATA.ord = DATA.ord + 1;
+ var buf = new buffer.Buffer(str.length);
+ buf.write(JSON.stringify(DATA) + '\n', 'utf8');
+
+ s.write(str, 'utf8', pipeFDs[1]);
+ if (s.write(buf, undefined, pipeFDs[1])) {
+ netBinding.close(pipeFDs[1]);
+ } else {
+ s.addListener('drain', function() {
+ netBinding.close(pipeFDs[1]);
+ });
+ }
+});
+srv.listen(SOCK_PATH);
+
+// Spawn a child running test/fixtures/recvfd.js
+var cp = child_process.spawn(process.argv[0],
+ [path.join(fixturesDir, 'recvfd.js'), SOCK_PATH]);
+
+cp.stdout.addListener('data', logChild);
+cp.stderr.addListener('data', logChild);
+
+// When the child exits, clean up and validate its exit status
+var cpp = cp.pid;
+cp.addListener('exit', function(code, signal) {
+ srv.close();
+ // fs.unlinkSync(SOCK_PATH);
+
+ assert.equal(code, 0);
+ assert.equal(seenOrdinals.length, 2);
+});
+
+// vim:ts=2 sw=2 et