See `kill(2)`
-### child.send(message, [sendHandle], [options])
+### child.send(message, [sendHandle])
* `message` {Object}
* `sendHandle` {Handle object}
-* `options` {Object}
When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
socket object to another process. The child will receive the object as its
second argument to the `message` event.
-The `options` object may have the following properties:
-
- * `track` - Notify master process when `sendHandle` will be closed in child
- process. (`false` by default)
-
**send server object**
Here is an example of sending a server:
The number of concurrent connections on the server.
-This becomes `null` when sending a socket to a child with
-`child_process.fork()`. To poll forks and get current number of active
-connections use asynchronous `server.getConnections` instead.
+This becomes `null` when sending a socket to a child with `child_process.fork()`.
`net.Server` is an [EventEmitter][] with the following events:
-### server.getConnections(callback)
-
-Asynchronously get the number of concurrent connections on the server. Works
-when sockets were sent to forks.
-
-Callback should take two arguments `err` and `count`.
-
### Event: 'listening'
Emitted when the server has been bound after calling `server.listen`.
},
'net.Socket': {
- send: function(message, socket, options) {
- // if the socket was created by net.Server
+ send: function(message, socket) {
+ // pause socket so no data is lost, will be resumed later
+
+ // if the socket wsa created by net.Server
if (socket.server) {
// the slave should keep track of the socket
message.key = socket.server._connectionKey;
var firstTime = !this._channel.sockets.send[message.key];
+
+ // add socket to connections list
var socketList = getSocketList('send', this, message.key);
+ socketList.add(socket);
- if (options && options.track) {
- // Keep track of socket's status
- message.id = socketList.add(socket);
- } else {
- // the server should no longer expose a .connection property
- // and when asked to close it should query the socket status from
- // the slaves
- if (firstTime) socket.server._setupSlave(socketList);
-
- // Act like socket is detached
- socket.server._connections--;
+ // the server should no longer expose a .connection property
+ // and when asked to close it should query the socket status from slaves
+ if (firstTime) {
+ socket.server._setupSlave(socketList);
}
}
// remove handle from socket object, it will be closed when the socket
- // will be sent
+ // has been send
var handle = socket._handle;
handle.onread = function() {};
socket._handle = null;
return handle;
},
- postSend: function(handle) {
- // Close the Socket handle after sending it
- handle.close();
- },
-
got: function(message, handle, emit) {
var socket = new net.Socket({handle: handle});
socket.readable = socket.writable = true;
// add socket to connections list
var socketList = getSocketList('got', this, message.key);
- socketList.add({
- id: message.id,
- socket: socket
- });
+ socketList.add(socket);
}
emit(socket);
var self = this;
this.key = key;
- this.slave = slave;
-
- // These two arrays are used to store the list of sockets and the freelist of
- // indexes in this list. After insertion, item will have persistent index
- // until it'll be removed. This way we can use this index as an identifier for
- // sockets.
this.list = [];
- this.freelist = [];
+ this.slave = slave;
slave.once('disconnect', function() {
self.flush();
this.slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return;
- self.remove(msg.id);
+ self.flush();
});
}
util.inherits(SocketListSend, EventEmitter);
SocketListSend.prototype.add = function(socket) {
- var index;
-
- // Pick one of free indexes, or insert in the end of the list
- if (this.freelist.length > 0) {
- index = this.freelist.pop();
- this.list[index] = socket;
- } else {
- index = this.list.push(socket) - 1;
- }
-
- return index;
-};
-
-SocketListSend.prototype.remove = function(index) {
- var socket = this.list[index];
- if (!socket) return;
-
- // Create a hole in the list and move index to the freelist
- this.list[index] = null;
- this.freelist.push(index);
-
- socket.destroy();
+ this.list.push(socket);
};
SocketListSend.prototype.flush = function() {
var list = this.list;
this.list = [];
- this.freelist = [];
list.forEach(function(socket) {
- if (socket) socket.destroy();
+ socket.destroy();
});
};
-SocketListSend.prototype._request = function request(msg, cmd, callback) {
- var self = this;
-
- if (!this.slave.connected) return onslaveclose();
- this.slave.send(msg);
-
- function onclose() {
- self.slave.removeListener('internalMessage', onreply);
- callback(new Error('Slave closed before reply'));
- };
-
- function onreply(msg) {
- if (msg.cmd !== cmd || msg.key !== self.key) return;
- self.slave.removeListener('disconnect', onclose);
- self.slave.removeListener('internalMessage', onreply);
-
- callback(null, msg);
- };
-
- this.slave.once('disconnect', onclose);
- this.slave.on('internalMessage', onreply);
-};
-
-SocketListSend.prototype.close = function close(callback) {
- this._request({
- cmd: 'NODE_SOCKET_NOTIFY_CLOSE',
- key: this.key
- }, 'NODE_SOCKET_ALL_CLOSED', callback);
-};
+SocketListSend.prototype.update = function() {
+ if (this.slave.connected === false) return;
-SocketListSend.prototype.getConnections = function getConnections(callback) {
- this._request({
- cmd: 'NODE_SOCKET_GET_COUNT',
+ this.slave.send({
+ cmd: 'NODE_SOCKET_FETCH',
key: this.key
- }, 'NODE_SOCKET_COUNT', function(err, msg) {
- if (err) return callback(err);
- callback(null, msg.count);
});
};
var self = this;
- this.connections = 0;
this.key = key;
+ this.list = [];
this.slave = slave;
- function onempty() {
- if (!self.slave.connected) return;
-
- self.slave.send({
- cmd: 'NODE_SOCKET_ALL_CLOSED',
- key: self.key
- });
- }
+ slave.on('internalMessage', function(msg) {
+ if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return;
- this.slave.on('internalMessage', function(msg) {
- if (msg.key !== self.key) return;
-
- if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') {
- // Already empty
- if (self.connections === 0) return onempty();
-
- // Wait for sockets to get closed
- self.once('empty', onempty);
- } else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') {
- if (!self.slave.connected) return;
- self.slave.send({
- cmd: 'NODE_SOCKET_COUNT',
- key: self.key,
- count: self.connections
- });
+ if (self.list.length === 0) {
+ self.flush();
+ return;
}
+
+ self.on('itemRemoved', function removeMe() {
+ if (self.list.length !== 0) return;
+ self.removeListener('itemRemoved', removeMe);
+ self.flush();
+ });
});
}
util.inherits(SocketListReceive, EventEmitter);
-SocketListReceive.prototype.add = function(obj) {
- var self = this;
-
- this.connections++;
+SocketListReceive.prototype.flush = function() {
+ this.list = [];
- // Notify previous owner of socket about its state change
- obj.socket.once('close', function() {
- self.connections--;
+ if (this.slave.connected) {
+ this.slave.send({
+ cmd: 'NODE_SOCKET_CLOSED',
+ key: this.key
+ });
+ }
+};
- if (obj.id !== undefined && self.slave.connected) {
- // Master wants to keep eye on socket status
- self.slave.send({
- cmd: 'NODE_SOCKET_CLOSED',
- key: self.key,
- id: obj.id
- });
- }
+SocketListReceive.prototype.add = function(socket) {
+ var self = this;
+ this.list.push(socket);
- if (self.connections === 0) self.emit('empty');
+ socket.on('close', function() {
+ self.list.splice(self.list.indexOf(socket), 1);
+ self.emit('itemRemoved');
});
};
var string = JSON.stringify(message) + '\n';
var writeReq = channel.writeUtf8String(string, handle);
+ // Close the Socket handle after sending it
+ if (message && message.type === 'net.Socket') {
+ handle.close();
+ }
+
if (!writeReq) {
var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
this.emit('error', er);
}
- if (obj && obj.postSend) {
- writeReq.oncomplete = obj.postSend.bind(null, handle);
- } else {
- writeReq.oncomplete = nop;
- }
+ writeReq.oncomplete = nop;
/* If the master is > 2 read() calls behind, please stop sending. */
return channel.writeQueueSize < (65536 * 2);
this._closesNeeded = 1;
this._closesGot = 0;
- this.connected = false;
this.signalCode = null;
this.exitCode = null;
this._connections = 0;
+ // when server is using slaves .connections is not reliable
+ // so null will be return if thats the case
Object.defineProperty(this, 'connections', {
get: function() {
if (self._usingSlaves) {
});
this._handle = null;
- this._usingSlaves = false;
- this._slaves = [];
this.allowHalfOpen = options.allowHalfOpen || false;
}
}
-Server.prototype.getConnections = function(cb) {
- if (!this._usingSlaves) return cb(null, this.connections);
-
- // Poll slaves
- var left = this._slaves.length,
- total = this._connections;
-
- function oncount(err, count) {
- if (err) {
- left = -1;
- return cb(err);
- }
-
- total += count;
- if (--left === 0) return cb(null, total);
- }
-
- this._slaves.forEach(function(slave) {
- slave.getConnections(oncount);
- });
-};
-
-
Server.prototype.close = function(cb) {
- function onSlaveClose() {
- if (--left !== 0) return;
-
- self._connections = 0;
- self._emitCloseIfDrained();
- }
-
if (!this._handle) {
// Throw error. Follows net_legacy behaviour.
throw new Error('Not running');
}
this._handle.close();
this._handle = null;
+ this._emitCloseIfDrained();
+ // fetch new socket lists
if (this._usingSlaves) {
- var self = this,
- left = this._slaves.length;
-
- // Increment connections to be sure that, even if all sockets will be closed
- // during polling of slaves, `close` event will be emitted only once.
- this._connections++;
-
- // Poll slaves
- this._slaves.forEach(function(slave) {
- slave.close(onSlaveClose);
+ this._slaves.forEach(function(socketList) {
+ if (socketList.list.length === 0) return;
+ socketList.update();
});
- } else {
- this._emitCloseIfDrained();
}
return this;
return this.listen({ fd: fd });
}, 'listenFD is deprecated. Use listen({fd: <number>}).');
+// when sending a socket using fork IPC this function is executed
Server.prototype._setupSlave = function(socketList) {
- this._usingSlaves = true;
+ if (!this._usingSlaves) {
+ this._usingSlaves = true;
+ this._slaves = [];
+ }
this._slaves.push(socketList);
};
+++ /dev/null
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-var assert = require('assert');
-var common = require('../common');
-var fork = require('child_process').fork;
-var net = require('net');
-var count = 12;
-
-if (process.argv[2] === 'child') {
- var sockets = [];
- var id = process.argv[3];
-
- process.on('message', function(m, socket) {
- if (socket) {
- sockets.push(socket);
- }
-
- if (m.cmd === 'close') {
- sockets[m.id].once('close', function() {
- process.send({ id: m.id, status: 'closed' });
- });
- sockets[m.id].destroy();
- }
- });
-} else {
- var child = fork(process.argv[1], ['child']);
-
- var server = net.createServer();
- var sockets = [];
- var sent = 0;
-
- server.on('connection', function(socket) {
- child.send({ cmd: 'new' }, socket, { track: false });
- sockets.push(socket);
-
- if (sockets.length === count) {
- closeSockets();
- server.close();
- }
- });
-
- var disconnected = 0;
- server.on('listening', function() {
-
- var j = count, client;
- while (j--) {
- client = net.connect(common.PORT, '127.0.0.1');
- client.on('close', function() {
- console.error('[m] CLIENT: close event');
- disconnected += 1;
- });
- // XXX This resume() should be unnecessary.
- // a stream high water mark should be enough to keep
- // consuming the input.
- client.resume();
- }
- });
-
- function closeSockets(i) {
- if (!i) i = 0;
- if (i === count) return;
-
- sent++;
- child.send({ id: i, cmd: 'close' });
- child.once('message', function(m) {
- assert(m.status === 'closed');
- server.getConnections(function(err, num) {
- closeSockets(i + 1);
- });
- });
- };
-
- var closeEmitted = false;
- server.on('close', function() {
- console.error('[m] server close');
- closeEmitted = true;
-
- child.kill();
- });
-
- server.listen(common.PORT, '127.0.0.1');
-
- process.on('exit', function() {
- assert.equal(sent, count);
- assert.equal(disconnected, count);
- assert.ok(closeEmitted);
- });
-}
var count = 12;
if (process.argv[2] === 'child') {
+
var needEnd = [];
- var id = process.argv[3];
process.on('message', function(m, socket) {
if (!socket) return;
- console.error('[%d] got socket', id, m);
+ console.error('got socket', m);
// will call .end('end') or .write('write');
socket[m](m);
socket.resume();
socket.on('data', function() {
- console.error('[%d] socket.data', id, m);
+ console.error('%d socket.data', process.pid, m);
});
socket.on('end', function() {
- console.error('[%d] socket.end', id, m);
+ console.error('%d socket.end', process.pid, m);
});
// store the unfinished socket
}
socket.on('close', function() {
- console.error('[%d] socket.close', id, m);
+ console.error('%d socket.close', process.pid, m);
});
socket.on('finish', function() {
- console.error('[%d] socket finished', id, m);
+ console.error('%d socket finished', process.pid, m);
});
});
process.on('message', function(m) {
if (m !== 'close') return;
- console.error('[%d] got close message', id);
+ console.error('got close message');
needEnd.forEach(function(endMe, i) {
- console.error('[%d] ending %d', id, i);
+ console.error('%d ending %d', process.pid, i);
endMe.end('end');
});
});
process.on('disconnect', function() {
- console.error('[%d] process disconnect, ending', id);
+ console.error('%d process disconnect, ending', process.pid);
needEnd.forEach(function(endMe, i) {
- console.error('[%d] ending %d', id, i);
+ console.error('%d ending %d', process.pid, i);
endMe.end('end');
});
+ endMe = null;
});
} else {
- var child1 = fork(process.argv[1], ['child', '1']);
- var child2 = fork(process.argv[1], ['child', '2']);
- var child3 = fork(process.argv[1], ['child', '3']);
+ var child1 = fork(process.argv[1], ['child']);
+ var child2 = fork(process.argv[1], ['child']);
+ var child3 = fork(process.argv[1], ['child']);
var server = net.createServer();
- var connected = 0,
- closed = 0;
+ var connected = 0;
server.on('connection', function(socket) {
switch (connected % 6) {
case 0:
- child1.send('end', socket, { track: false }); break;
+ child1.send('end', socket); break;
case 1:
- child1.send('write', socket, { track: true }); break;
+ child1.send('write', socket); break;
case 2:
- child2.send('end', socket, { track: true }); break;
+ child2.send('end', socket); break;
case 3:
- child2.send('write', socket, { track: false }); break;
+ child2.send('write', socket); break;
case 4:
- child3.send('end', socket, { track: false }); break;
+ child3.send('end', socket); break;
case 5:
- child3.send('write', socket, { track: false }); break;
+ child3.send('write', socket); break;
}
connected += 1;
- socket.once('close', function() {
- console.log('[m] socket closed, total %d', ++closed);
- });
-
if (connected === count) {
closeServer();
}
while (j--) {
client = net.connect(common.PORT, '127.0.0.1');
client.on('close', function() {
- console.error('[m] CLIENT: close event');
+ console.error('CLIENT: close event in master');
disconnected += 1;
});
// XXX This resume() should be unnecessary.
var closeEmitted = false;
server.on('close', function() {
- console.error('[m] server close');
+ console.error('server close');
closeEmitted = true;
child1.kill();
var timeElasped = 0;
var closeServer = function() {
- console.error('[m] closeServer');
+ console.error('closeServer');
var startTime = Date.now();
server.on('close', function() {
- console.error('[m] emit(close)');
+ console.error('emit(close)');
timeElasped = Date.now() - startTime;
});
- console.error('[m] calling server.close');
+ console.error('calling server.close');
server.close();
setTimeout(function() {
- assert(!closeEmitted);
- console.error('[m] sending close to children');
+ console.error('sending close to children');
child1.send('close');
child2.send('close');
child3.disconnect();
+++ /dev/null
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-var assert = require('assert');
-var common = require('../common');
-var fork = require('child_process').fork;
-var net = require('net');
-var count = 12;
-
-if (process.argv[2] === 'child') {
- var sockets = [];
- var id = process.argv[3];
-
- process.on('message', function(m, socket) {
- if (socket) {
- sockets.push(socket);
- }
-
- if (m.cmd === 'close') {
- sockets[m.id].once('close', function() {
- process.send({ id: m.id, status: 'closed' });
- });
- sockets[m.id].destroy();
- }
- });
-} else {
- var child = fork(process.argv[1], ['child']);
-
- var server = net.createServer();
- var sockets = [];
- var closed = 0;
-
- server.on('connection', function(socket) {
- child.send({ cmd: 'new' }, socket, { track: true });
- sockets.push(socket);
-
- socket.once('close', function() {
- console.error('[m] socket closed');
- closed++;
- assert.equal(closed + server.connections, count);
- if (server.connections === 0) server.close();
- });
-
- if (sockets.length === count) {
- closeSockets();
- }
- });
-
- var disconnected = 0;
- server.on('listening', function() {
-
- var j = count, client;
- while (j--) {
- client = net.connect(common.PORT, '127.0.0.1');
- client.on('close', function() {
- console.error('[m] CLIENT: close event');
- disconnected += 1;
- });
- // XXX This resume() should be unnecessary.
- // a stream high water mark should be enough to keep
- // consuming the input.
- client.resume();
- }
- });
-
- function closeSockets(i) {
- if (!i) i = 0;
- if (i === count) return;
-
- child.send({ id: i, cmd: 'close' });
- child.once('message', function(m) {
- assert(m.status === 'closed');
- closeSockets(i + 1);
- });
- };
-
- var closeEmitted = false;
- server.on('close', function() {
- console.error('[m] server close');
- closeEmitted = true;
-
- child.kill();
- });
-
- server.listen(common.PORT, '127.0.0.1');
-
- process.on('exit', function() {
- assert.equal(disconnected, count);
- assert.equal(closed, count);
- assert.ok(closeEmitted);
- });
-}