child_process.fork() support sending native hander object, this patch add support for sending
net.Server and net.Socket object by converting the object to a native handle object and back
to a useful object again.
Note when sending a Socket there was emitted by a net Server object, the server.connections
property becomes null, because it is no longer possible to known when it is destroyed.
### Event: 'message'
* `message` {Object} a parsed JSON object or primitive value
-* `sendHandle` {Handle object} a handle object
+* `sendHandle` {Handle object} a Socket or Server object
Messages send by `.send(message, [sendHandle])` are obtained using the
`message` event.
* `message` {Object}
* `sendHandle` {Handle object}
-When useing `child_process.fork()` an you can write to the child using
+When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
a `'message'` event on the child.
Messages containing the prefix are emitted in the `internalMessage` event, you
should by all means avoid using this feature, it is subject to change without notice.
-The `sendHandle` option to `child.send()` is for sending a handle object to
-another process. The child will receive the object as its second argument to
-the `message` event.
+The `sendHandle` option to `child.send()` is for sending a TCP server or
+socket object to another process. The child will receive the object as its
+second argument to the `message` event.
+
+**send server object**
+
+Here is an example of sending a server:
+
+ var child = require('child_process').fork('child.js');
+
+ // Open up the server object and send the handle.
+ var server = require('net').createServer();
+ server.on('connection', function (socket) {
+ socket.end('handled by parent');
+ });
+ server.listen(1337, function() {
+ child.send('server', server);
+ });
+
+And the child would the recive the server object as:
+
+ process.on('message', function(m, server) {
+ if (m === 'server') {
+ server.on('connection', function (socket) {
+ socket.end('handled by child');
+ });
+ }
+ });
+
+Note that the server is now shared between the parent and child, this means
+that some connections will be handled by the parent and some by the child.
+
+**send socket object**
+
+Here is an example of sending a socket. It will spawn two childs and handle
+connections with the remote address `74.125.127.100` as VIP by sending the
+socket to a "special" child process. Other sockets will go to a "normal" process.
+
+ var normal = require('child_process').fork('child.js', ['normal']);
+ var special = require('child_process').fork('child.js', ['special']);
+
+ // Open up the server and send sockets to child
+ var server = require('net').createServer();
+ server.on('connection', function (socket) {
+
+ // if this is a VIP
+ if (socket.remoteAddress === '74.125.127.100') {
+ special.send('socket', socket);
+ return;
+ }
+ // just the usual dudes
+ normal.send('socket', socket);
+ });
+ server.listen(1337);
+
+The `child.js` could look like this:
+
+ process.on('message', function(m, socket) {
+ if (m === 'socket') {
+ socket.end('You where handled as a ' + process.argv[2] + ' person');
+ }
+ });
+
+Note that once a single socket has been sent to a child the parent can no
+longer keep track of when the socket is destroyed. To indicate this condition
+the `.connections` property becomes `null`.
+It is also recomended not to use `.maxConnections` in this condition.
### child.disconnect()
This is a special case of the `spawn()` functionality for spawning Node
processes. In addition to having all the methods in a normal ChildProcess
-instance, the returned object has a communication channel built-in. Se
+instance, the returned object has a communication channel built-in. See
`child.send(message, [sendHandle])` for details.
By default the spawned Node process will have the stdout, stderr associated
Set this property to reject connections when the server's connection count gets
high.
+It is not recommended to use this option once a socket has been sent to a child
+with `child_process.fork()`.
+
### server.connections
The number of concurrent connections on the server.
+This becomes `null` when sending a socket to a child with `child_process.fork()`.
`net.Server` is an `EventEmitter` with the following events:
}
+// this object contain function to convert TCP objects to native handle objects
+// and back again.
+var handleConversion = {
+ 'net.Native': {
+ simultaneousAccepts: true,
+
+ send: function(message, handle) {
+ return handle;
+ },
+
+ got: function(message, handle, emit) {
+ emit(handle);
+ }
+ },
+
+ 'net.Server': {
+ simultaneousAccepts: true,
+
+ send: function(message, server) {
+ return server._handle;
+ },
+
+ got: function(message, handle, emit) {
+ var self = this;
+
+ var server = new net.Server();
+ server.listen(handle, function() {
+ emit(server);
+ });
+ }
+ },
+
+ 'net.Socket': {
+ send: function(message, socket) {
+ // pause socket so no data is lost, will be resumed later
+ socket.pause();
+
+ // if the socket wsa created by net.Server
+ if (socket.server) {
+ // the slave should keep track of the socket
+ message.key = socket.server._connectionKey;
+
+ var firstTime = !this._channel.sockets.send[message.key];
+
+ // add socket to connections list
+ var socketList = getSocketList('send', this, message.key);
+ socketList.add(socket);
+
+ // the server should no longer expose a .connection property
+ // and when asked to close it should query the socket status from slaves
+ if (firstTime) {
+ socket.server._setupSlave(socketList);
+ }
+ }
+
+ // remove handle from socket object, it will be closed when the socket
+ // has been send
+ var handle = socket._handle;
+ handle.onread = function() {};
+ socket._handle = null;
+
+ return handle;
+ },
+
+ got: function(message, handle, emit) {
+ var socket = new net.Socket({handle: handle});
+ socket.readable = socket.writable = true;
+ socket.pause();
+
+ // if the socket was created by net.Server we will track the socket
+ if (message.key) {
+
+ // add socket to connections list
+ var socketList = getSocketList('got', this, message.key);
+ socketList.add(socket);
+ }
+
+ emit(socket);
+ socket.resume();
+ }
+ }
+};
+
+// This object keep track of the socket there are sended
+function SocketListSend(slave, key) {
+ var self = this;
+
+ this.key = key;
+ this.list = [];
+ this.slave = slave;
+
+ slave.once('disconnect', function() {
+ self.flush();
+ });
+
+ this.slave.on('internalMessage', function(msg) {
+ if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return;
+ self.flush();
+ });
+}
+util.inherits(SocketListSend, EventEmitter);
+
+SocketListSend.prototype.add = function(socket) {
+ this.list.push(socket);
+};
+
+SocketListSend.prototype.flush = function() {
+ var list = this.list;
+ this.list = [];
+
+ list.forEach(function(socket) {
+ socket.destroy();
+ });
+};
+
+SocketListSend.prototype.update = function() {
+ if (this.slave.connected === false) return;
+
+ this.slave.send({
+ cmd: 'NODE_SOCKET_FETCH',
+ key: this.key
+ });
+};
+
+// This object keep track of the socket there are received
+function SocketListReceive(slave, key) {
+ var self = this;
+
+ this.key = key;
+ this.list = [];
+ this.slave = slave;
+
+ slave.on('internalMessage', function(msg) {
+ if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return;
+
+ if (self.list.length === 0) {
+ self.flush();
+ return;
+ }
+
+ self.on('itemRemoved', function removeMe() {
+ if (self.list.length !== 0) return;
+ self.removeListener('itemRemoved', removeMe);
+ self.flush();
+ });
+ });
+}
+util.inherits(SocketListReceive, EventEmitter);
+
+SocketListReceive.prototype.flush = function() {
+ this.list = [];
+
+ if (this.slave.connected) {
+ this.slave.send({
+ cmd: 'NODE_SOCKET_CLOSED',
+ key: this.key
+ });
+ }
+};
+
+SocketListReceive.prototype.add = function(socket) {
+ var self = this;
+ this.list.push(socket);
+
+ socket.on('close', function() {
+ self.list.splice(self.list.indexOf(socket), 1);
+ self.emit('itemRemoved');
+ });
+};
+
+function getSocketList(type, slave, key) {
+ var sockets = slave._channel.sockets[type];
+ var socketList = sockets[key];
+ if (!socketList) {
+ var Construct = type === 'send' ? SocketListSend : SocketListReceive;
+ socketList = sockets[key] = new Construct(slave, key);
+ }
+ return socketList;
+}
+
+function handleMessage(target, message, handle) {
+ //Filter out internal messages
+ //if cmd property begin with "_NODE"
+ if (message !== null &&
+ typeof message === 'object' &&
+ typeof message.cmd === 'string' &&
+ message.cmd.indexOf('NODE_') === 0) {
+ target.emit('internalMessage', message, handle);
+ }
+ //Non-internal message
+ else {
+ target.emit('message', message, handle);
+ }
+}
+
function setupChannel(target, channel) {
target._channel = channel;
var jsonBuffer = '';
channel.buffering = false;
channel.onread = function(pool, offset, length, recvHandle) {
- // Update simultaneous accepts on Windows
- net._setSimultaneousAccepts(recvHandle);
-
if (pool) {
jsonBuffer += pool.toString('ascii', offset, offset + length);
var json = jsonBuffer.slice(start, i);
var message = JSON.parse(json);
- //Filter out internal messages
- //if cmd property begin with "_NODE"
- if (message !== null &&
- typeof message === 'object' &&
- typeof message.cmd === 'string' &&
- message.cmd.indexOf('NODE_') === 0) {
- target.emit('internalMessage', message, recvHandle);
- }
- //Non-internal message
- else {
- target.emit('message', message, recvHandle);
- }
+ handleMessage(target, message, recvHandle);
start = i + 1;
}
}
};
- target.send = function(message, sendHandle) {
+ // object where socket lists will live
+ channel.sockets = { got: {}, send: {} };
+
+ // handlers will go through this
+ target.on('internalMessage', function(message, handle) {
+ if (message.cmd !== 'NODE_HANDLE') return;
+
+ var obj = handleConversion[message.type];
+
+ // Update simultaneous accepts on Windows
+ if (obj.simultaneousAccepts) {
+ net._setSimultaneousAccepts(handle);
+ }
+
+ // Convert handle object
+ obj.got.call(this, message, handle, function(handle) {
+ handleMessage(target, message.msg, handle);
+ });
+ });
+
+ target.send = function(message, handle) {
if (typeof message === 'undefined') {
throw new TypeError('message cannot be undefined');
}
return false;
}
- var string = JSON.stringify(message) + '\n';
+ // package messages with a handle object
+ if (handle) {
+ // this message will be handled by an internalMessage event handler
+ message = {
+ cmd: 'NODE_HANDLE',
+ type: 'net.',
+ msg: message
+ };
+
+ switch (handle.constructor.name) {
+ case 'Socket':
+ message.type += 'Socket'; break;
+ case 'Server':
+ message.type += 'Server'; break;
+ case 'Pipe':
+ case 'TCP':
+ message.type += 'Native'; break;
+ }
- // Update simultaneous accepts on Windows
- net._setSimultaneousAccepts(sendHandle);
+ var obj = handleConversion[message.type];
+
+ // convert TCP object to native handle object
+ handle = handleConversion[message.type].send.apply(target, arguments);
- var writeReq = channel.writeUtf8String(string, sendHandle);
+ // Update simultaneous accepts on Windows
+ if (obj.simultaneousAccepts) {
+ net._setSimultaneousAccepts(handle);
+ }
+ }
+
+ var string = JSON.stringify(message) + '\n';
+ var writeReq = channel.writeUtf8String(string, handle);
+
+ // Close the Socket handle after sending it
+ if (message && message.type === 'net.Socket') {
+ handle.close();
+ }
if (!writeReq) {
var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
timers.unenroll(this);
if (this.server) {
- this.server.connections--;
+ this.server._connections--;
this.server._emitCloseIfDrained();
}
}
}
- this.connections = 0;
+ this._connections = 0;
+
+ // when server is using slaves .connections is not reliable
+ // so null will be return if thats the case
+ Object.defineProperty(this, 'connections', {
+ get: function() {
+ if (self._usingSlaves) {
+ return null;
+ }
+ return self._connections;
+ },
+ set: function(val) {
+ return (self._connections = val);
+ },
+ configurable: true, enumerable: true
+ });
+
this.allowHalfOpen = options.allowHalfOpen || false;
this._handle = null;
return;
}
+ // generate connection key, this should be unique to the connection
+ this._connectionKey = addressType + ':' + address + ':' + port;
+
process.nextTick(function() {
self.emit('listening');
});
return;
}
- if (self.maxConnections && self.connections >= self.maxConnections) {
+ if (self.maxConnections && self._connections >= self.maxConnections) {
clientHandle.close();
return;
}
socket.resume();
- self.connections++;
+ self._connections++;
socket.server = self;
DTRACE_NET_SERVER_CONNECTION(socket);
this._handle = null;
this._emitCloseIfDrained();
+ // fetch new socket lists
+ if (this._usingSlaves) {
+ this._slaves.forEach(function(socketList) {
+ if (socketList.list.length === 0) return;
+ socketList.update();
+ });
+ }
+
return this;
};
Server.prototype._emitCloseIfDrained = function() {
var self = this;
- if (self._handle || self.connections) return;
+ if (self._handle || self._connections) return;
process.nextTick(function() {
self.emit('close');
throw new Error('This API is no longer supported. See child_process.fork');
};
+// when sending a socket using fork IPC this function is executed
+Server.prototype._setupSlave = function(socketList) {
+ if (!this._usingSlaves) {
+ this._usingSlaves = true;
+ this._slaves = [];
+ }
+ this._slaves.push(socketList);
+};
+
// TODO: isIP should be moved to the DNS code. Putting it here now because
// this is what the legacy system did.
--- /dev/null
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var assert = require('assert');
+var common = require('../common');
+var fork = require('child_process').fork;
+var net = require('net');
+
+// progress tracker
+function ProgressTracker(missing, callback) {
+ this.missing = missing;
+ this.callback = callback;
+}
+ProgressTracker.prototype.done = function() {
+ this.missing -= 1;
+ this.check();
+};
+ProgressTracker.prototype.check = function() {
+ if (this.missing === 0) this.callback();
+};
+
+if (process.argv[2] === 'child') {
+
+ var serverScope;
+
+ process.on('message', function onServer(msg, server) {
+ if (msg.what !== 'server') return;
+ process.removeListener('message', onServer);
+
+ serverScope = server;
+
+ server.on('connection', function(socket) {
+ console.log('CHILD: got connection');
+ process.send({what: 'connection'});
+ socket.destroy();
+ });
+
+ // start making connection from parent
+ console.log('CHILD: server listening');
+ process.send({what: 'listening'});
+ });
+
+ process.on('message', function onClose(msg) {
+ if (msg.what !== 'close') return;
+ process.removeListener('message', onClose);
+
+ serverScope.on('close', function() {
+ process.send({what: 'close'});
+ });
+ serverScope.close();
+ });
+
+ process.on('message', function onSocket(msg, socket) {
+ if (msg.what !== 'socket') return;
+ process.removeListener('message', onSocket);
+ socket.end('echo');
+ console.log('CHILD: got socket');
+ });
+
+ process.send({what: 'ready'});
+} else {
+
+ var child = fork(process.argv[1], ['child']);
+
+ child.on('exit', function() {
+ console.log('CHILD: died');
+ });
+
+ // send net.Server to child and test by connecting
+ var testServer = function(callback) {
+
+ // destroy server execute callback when done
+ var progress = new ProgressTracker(2, function() {
+ server.on('close', function() {
+ console.log('PARENT: server closed');
+ child.send({what: 'close'});
+ });
+ server.close();
+ });
+
+ // we expect 10 connections and close events
+ var connections = new ProgressTracker(10, progress.done.bind(progress));
+ var closed = new ProgressTracker(10, progress.done.bind(progress));
+
+ // create server and send it to child
+ var server = net.createServer();
+ server.on('connection', function(socket) {
+ console.log('PARENT: got connection');
+ socket.destroy();
+ connections.done();
+ });
+ server.on('listening', function() {
+ console.log('PARENT: server listening');
+ child.send({what: 'server'}, server);
+ });
+ server.listen(common.PORT);
+
+ // handle client messages
+ var messageHandlers = function(msg) {
+
+ if (msg.what === 'listening') {
+ // make connections
+ var socket;
+ for (var i = 0; i < 10; i++) {
+ socket = net.connect(common.PORT, function() {
+ console.log('CLIENT: connected');
+ });
+ socket.on('close', function() {
+ closed.done();
+ console.log('CLIENT: closed');
+ });
+ }
+
+ } else if (msg.what === 'connection') {
+ // child got connection
+ connections.done();
+ } else if (msg.what === 'close') {
+ child.removeListener('message', messageHandlers);
+ callback();
+ }
+ };
+
+ child.on('message', messageHandlers);
+ };
+
+ // send net.Socket to child
+ var testSocket = function(callback) {
+
+ // create a new server and connect to it,
+ // but the socket will be handled by the child
+ var server = net.createServer();
+ server.on('connection', function(socket) {
+ socket.on('close', function() {
+ console.log('CLIENT: socket closed');
+ });
+ child.send({what: 'socket'}, socket);
+ });
+ server.on('close', function() {
+ console.log('PARENT: server closed');
+ callback();
+ });
+ server.listen(common.PORT, function() {
+ var connect = net.connect(common.PORT);
+ var store = '';
+ connect.on('data', function(chunk) {
+ store += chunk;
+ console.log('CLIENT: got data');
+ });
+ connect.on('close', function() {
+ console.log('CLIENT: closed');
+ assert.equal(store, 'echo');
+ server.close();
+ });
+ });
+ };
+
+ // create server and send it to child
+ var serverSucess = false;
+ var socketSucess = false;
+ child.on('message', function onReady(msg) {
+ if (msg.what !== 'ready') return;
+ child.removeListener('message', onReady);
+
+ testServer(function() {
+ serverSucess = true;
+
+ testSocket(function() {
+ socketSucess = true;
+ child.kill();
+ });
+ });
+
+ });
+
+ process.on('exit', function() {
+ assert.ok(serverSucess);
+ assert.ok(socketSucess);
+ });
+
+}
--- /dev/null
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var assert = require('assert');
+var common = require('../common');
+var fork = require('child_process').fork;
+var net = require('net');
+
+if (process.argv[2] === 'child') {
+
+ var endMe = null;
+
+ process.on('message', function(m, socket) {
+ if (!socket) return;
+
+ // will call .end('end') or .write('write');
+ socket[m](m);
+
+ // store the unfinished socket
+ if (m === 'write') {
+ endMe = socket;
+ }
+ });
+
+ process.on('message', function(m) {
+ if (m !== 'close') return;
+ endMe.end('end');
+ endMe = null;
+ });
+
+ process.on('disconnect', function() {
+ endMe.end('end');
+ endMe = null;
+ });
+
+} else {
+
+ var child1 = fork(process.argv[1], ['child']);
+ var child2 = fork(process.argv[1], ['child']);
+ var child3 = fork(process.argv[1], ['child']);
+
+ var server = net.createServer();
+
+ var connected = 0;
+ server.on('connection', function(socket) {
+ switch (connected) {
+ case 0:
+ child1.send('end', socket); break;
+ case 1:
+ child1.send('write', socket); break;
+ case 2:
+ child2.send('end', socket); break;
+ case 3:
+ child2.send('write', socket); break;
+ case 4:
+ child3.send('end', socket); break;
+ case 5:
+ child3.send('write', socket); break;
+ }
+ connected += 1;
+
+ if (connected === 6) {
+ closeServer();
+ }
+ });
+
+ var disconnected = 0;
+ server.on('listening', function() {
+
+ var j = 6, client;
+ while (j--) {
+ client = net.connect(common.PORT, '127.0.0.1');
+ client.on('close', function() {
+ disconnected += 1;
+ });
+ }
+ });
+
+ var closeEmitted = false;
+ server.on('close', function() {
+ closeEmitted = true;
+
+ child1.kill();
+ child2.kill();
+ child3.kill();
+ });
+
+ server.listen(common.PORT, '127.0.0.1');
+
+ var timeElasped = 0;
+ var closeServer = function() {
+ var startTime = Date.now();
+ server.on('close', function() {
+ timeElasped = Date.now() - startTime;
+ });
+
+ server.close();
+
+ setTimeout(function() {
+ child1.send('close');
+ child2.send('close');
+ child3.disconnect();
+ }, 200);
+ };
+
+ process.on('exit', function() {
+ assert.equal(disconnected, 6);
+ assert.equal(connected, 6);
+ assert.ok(closeEmitted);
+ assert.ok(timeElasped >= 190 && timeElasped <= 1000,
+ 'timeElasped was not between 190 and 1000 ms');
+ });
+}