// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
+var EventEmitter = require('events').EventEmitter;
var assert = require('assert');
+var dgram = require('dgram');
var fork = require('child_process').fork;
var net = require('net');
-var EventEmitter = require('events').EventEmitter;
var util = require('util');
-function isObject(o) {
- return (typeof o === 'object' && o !== null);
-}
+var cluster = new EventEmitter;
+module.exports = cluster;
+cluster.Worker = Worker;
+cluster.isWorker = ('NODE_UNIQUE_ID' in process.env);
+cluster.isMaster = (cluster.isWorker === false);
-var debug;
-if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
- debug = function(x) {
- var prefix = process.pid + ',' +
- (process.env.NODE_UNIQUE_ID ? 'Worker' : 'Master');
- console.error(prefix, x);
- };
-} else {
- debug = function() { };
-}
-// cluster object:
-function Cluster() {
+function Worker() {
+ if (!(this instanceof Worker)) return new Worker;
EventEmitter.call(this);
+ this.suicide = undefined;
+ this.state = 'none';
+ this.id = 0;
}
+util.inherits(Worker, EventEmitter);
-util.inherits(Cluster, EventEmitter);
-
-var cluster = module.exports = new Cluster();
-
-// Used in the master:
-var masterStarted = false;
-var ids = 0;
-var serverHandlers = {};
-
-// Used in the worker:
-var serverListeners = {};
-var queryIds = 0;
-var queryCallbacks = {};
-
-// Define isWorker and isMaster
-cluster.isWorker = 'NODE_UNIQUE_ID' in process.env;
-cluster.isMaster = ! cluster.isWorker;
-
-// The worker object is only used in a worker
-cluster.worker = cluster.isWorker ? {} : null;
-// The workers array is only used in the master
-cluster.workers = cluster.isMaster ? {} : null;
-
-// Settings object
-var settings = cluster.settings = {};
-
-// Simple function to call a function on each worker
-function eachWorker(cb) {
- // Go through all workers
- for (var id in cluster.workers) {
- if (cluster.workers.hasOwnProperty(id)) {
- cb(cluster.workers[id]);
- }
- }
-}
-
-// Extremely simple progress tracker
-function ProgressTracker(missing, callback) {
- this.missing = missing;
- this.callback = callback;
-}
-ProgressTracker.prototype.done = function() {
- this.missing -= 1;
- this.check();
+Worker.prototype.kill = function() {
+ this.destroy.apply(this, arguments);
};
-ProgressTracker.prototype.check = function() {
- if (this.missing === 0) this.callback();
-};
-
-cluster.setupMaster = function(options) {
- // This can only be called from the master.
- assert(cluster.isMaster);
-
- // Don't allow this function to run more than once
- if (masterStarted) return;
- masterStarted = true;
-
- // Get filename and arguments
- options = options || {};
-
- // By default, V8 writes the profile data of all processes to a single
- // v8.log.
- //
- // Running that log file through a tick processor produces bogus numbers
- // because many events won't match up with the recorded memory mappings
- // and you end up with graphs where 80+% of ticks is unaccounted for.
- //
- // Fixing the tick processor to deal with multi-process output is not very
- // useful because the processes may be running wildly disparate workloads.
- //
- // That's why we fix up the command line arguments to include
- // a "--logfile=v8-%p.log" argument (where %p is expanded to the PID)
- // unless it already contains a --logfile argument.
- var execArgv = options.execArgv || process.execArgv;
- if (execArgv.some(function(s) { return /^--prof/.test(s); }) &&
- !execArgv.some(function(s) { return /^--logfile=/.test(s); }))
- {
- execArgv = execArgv.slice();
- execArgv.push('--logfile=v8-%p.log');
- }
-
- // Set settings object
- settings = cluster.settings = {
- exec: options.exec || process.argv[1],
- execArgv: execArgv,
- args: options.args || process.argv.slice(2),
- silent: options.silent || false
- };
- // emit setup event
- cluster.emit('setup');
+Worker.prototype.send = function() {
+ this.process.send.apply(this.process, arguments);
};
-// Check if a message is internal only
-var INTERNAL_PREFIX = 'NODE_CLUSTER_';
-function isInternalMessage(message) {
- return isObject(message) &&
- typeof message.cmd === 'string' &&
- message.cmd.length > INTERNAL_PREFIX.length &&
- message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX;
-}
+// Master/worker specific methods are defined in the *Init() functions.
-// Modify message object to be internal
-function internalMessage(inMessage) {
- var outMessage = util._extend({}, inMessage);
- // Add internal prefix to cmd
- outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || '');
+if (cluster.isMaster)
+ masterInit();
+else
+ workerInit();
- return outMessage;
-}
-
-// Handle callback messages
-function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) {
-
- // The message there will be sent
- var message = internalMessage(outMessage);
- // callback id - will be undefined if not set
- message._queryEcho = inMessage._requestEcho;
+function masterInit() {
+ cluster.workers = {};
- // Call callback if a query echo is received
- if (inMessage._queryEcho) {
- queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle);
- delete queryCallbacks[inMessage._queryEcho];
- }
+ var intercom = new EventEmitter;
+ var settings = {
+ args: process.argv.slice(2),
+ exec: process.argv[1],
+ execArgv: process.execArgv,
+ silent: false
+ };
+ cluster.settings = settings;
+
+ // Indexed by address:port:etc key. Its entries are dicts with handle and
+ // workers keys. That second one is a list of workers that hold a reference
+ // to the handle. When a worker dies, we scan the dicts and close the handle
+ // when its reference count drops to zero. Yes, that means we're doing an
+ // O(n*m) scan but n and m are small and worker deaths are rare events anyway.
+ var handles = {};
+
+ var initialized = false;
+ cluster.setupMaster = function(options) {
+ if (initialized === true) return;
+ initialized = true;
+ settings = util._extend(settings, options || {});
+ // Tell V8 to write profile data for each process to a separate file.
+ // Without --logfile=v8-%p.log, everything ends up in a single, unusable
+ // file. (Unusable because what V8 logs are memory addresses and each
+ // process has its own memory mappings.)
+ if (settings.execArgv.some(function(s) { return /^--prof/.test(s); }) &&
+ !settings.execArgv.some(function(s) { return /^--logfile=/.test(s); }))
+ {
+ settings.execArgv = settings.execArgv.concat(['--logfile=v8-%p.log']);
+ }
+ cluster.settings = settings;
+ cluster.emit('setup');
+ };
- // Send if outWrap contains something useful
- if (!(outMessage === undefined && message._queryEcho === undefined)) {
- sendInternalMessage(worker, message, outHandle);
- }
-}
+ var ids = 0;
+ cluster.fork = function(env) {
+ cluster.setupMaster();
+ var worker = new Worker;
+ worker.id = ++ids;
+ var workerEnv = util._extend({}, process.env);
+ workerEnv = util._extend(workerEnv, env);
+ workerEnv.NODE_UNIQUE_ID = '' + worker.id;
+ worker.process = fork(settings.exec, settings.args, {
+ env: workerEnv,
+ silent: settings.silent,
+ execArgv: settings.execArgv
+ });
+ worker.process.once('exit', function(exitCode, signalCode) {
+ worker.suicide = !!worker.suicide;
+ worker.state = 'dead';
+ worker.emit('exit', exitCode, signalCode);
+ cluster.emit('exit', worker, exitCode, signalCode);
+ delete cluster.workers[worker.id];
+ });
+ worker.process.once('disconnect', function() {
+ worker.suicide = !!worker.suicide;
+ worker.state = 'disconnected';
+ worker.emit('disconnect');
+ cluster.emit('disconnect', worker);
+ delete cluster.workers[worker.id];
+ });
+ worker.process.on('error', worker.emit.bind(worker, 'error'));
+ worker.process.on('message', worker.emit.bind(worker, 'message'));
+ worker.process.on('internalMessage', internal(worker, onmessage));
+ process.nextTick(function() {
+ cluster.emit('fork', worker);
+ });
+ cluster.workers[worker.id] = worker;
+ return worker;
+ };
-// Handle messages from both master and workers
-var messageHandler = {};
-function handleMessage(worker, inMessage, inHandle) {
+ cluster.disconnect = function(cb) {
+ for (var key in cluster.workers) {
+ var worker = cluster.workers[key];
+ worker.disconnect();
+ }
+ if (cb) intercom.once('disconnect', cb);
+ };
- // Remove internal prefix
- var message = util._extend({}, inMessage);
- message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length);
+ cluster.on('disconnect', function(worker) {
+ delete cluster.workers[worker.id];
+ // O(n*m) scan but for small values of n and m.
+ for (var key in handles) {
+ var e = handles[key];
+ var i = e.workers.indexOf(worker);
+ if (i === -1) continue;
+ e.workers.splice(i, 1);
+ if (e.workers.length !== 0) continue;
+ e.handle.close();
+ delete handles[key];
+ }
+ if (Object.keys(handles).length === 0) {
+ intercom.emit('disconnect');
+ }
+ });
- var respondUsed = false;
- function respond(outMessage, outHandler) {
- respondUsed = true;
- handleResponse(outMessage, outHandler, inMessage, inHandle, worker);
- }
+ Worker.prototype.disconnect = function() {
+ this.suicide = true;
+ send(this, { act: 'disconnect' });
+ };
- // Run handler if it exists
- if (messageHandler[message.cmd]) {
- messageHandler[message.cmd](message, worker, respond);
- }
+ Worker.prototype.destroy = function(signo) {
+ signo = signo || 'SIGTERM';
+ var proc = this.process;
+ if (proc.connected) {
+ proc.once('disconnect', proc.kill.bind(proc, signo));
+ proc.disconnect();
+ return;
+ }
+ proc.kill(signo);
+ };
- // Send respond if it hasn't been called yet
- if (respondUsed === false) {
- respond();
+ function onmessage(message, handle) {
+ var worker = this;
+ if (message.act === 'online')
+ online(worker);
+ else if (message.act === 'queryServer')
+ queryServer(worker, message);
+ else if (message.act === 'listening')
+ listening(worker, message);
+ else if (message.act === 'suicide')
+ worker.suicide = true;
}
-}
-
-// Messages to the master will be handled using these methods
-if (cluster.isMaster) {
- // Handle online messages from workers
- messageHandler.online = function(message, worker) {
+ function online(worker) {
worker.state = 'online';
- debug('Worker ' + worker.process.pid + ' online');
worker.emit('online');
cluster.emit('online', worker);
- };
-
- // Handle queryServer messages from workers
- messageHandler.queryServer = function(message, worker, send) {
+ }
- // This sequence of information is unique to the connection
- // but not to the worker
+ function queryServer(worker, message) {
var args = [message.address,
message.port,
message.addressType,
message.fd];
var key = args.join(':');
- var handler;
-
- if (serverHandlers.hasOwnProperty(key)) {
- handler = serverHandlers[key];
- } else if (message.addressType === 'udp4' ||
- message.addressType === 'udp6') {
- var dgram = require('dgram');
- handler = dgram._createSocketHandle.apply(net, args);
- serverHandlers[key] = handler;
- } else {
- handler = net._createServerHandle.apply(net, args);
- serverHandlers[key] = handler;
+ var e = handles[key];
+ if (typeof e === 'undefined') {
+ e = { workers: [] };
+ if (message.addressType === 'udp4' || message.addressType === 'udp6')
+ e.handle = dgram._createSocketHandle.apply(null, args);
+ else
+ e.handle = net._createServerHandle.apply(null, args);
+ handles[key] = e;
}
+ e.workers.push(worker);
+ send(worker, { ack: message.seq }, e.handle);
+ }
- // echo callback with the fd handler associated with it
- send({}, handler);
- };
-
- // Handle listening messages from workers
- messageHandler.listening = function(message, worker) {
-
- worker.state = 'listening';
-
- // Emit listening, now that we know the worker is listening
- worker.emit('listening', {
- address: message.address,
- port: message.port,
+ function listening(worker, message) {
+ var info = {
addressType: message.addressType,
- fd: message.fd
- });
- cluster.emit('listening', worker, {
address: message.address,
port: message.port,
- addressType: message.addressType,
fd: message.fd
- });
- };
-
- // Handle suicide messages from workers
- messageHandler.suicide = function(message, worker) {
- worker.suicide = true;
- };
-}
-
-
-// Messages to a worker will be handled using these methods
-else if (cluster.isWorker) {
-
- // Handle worker.disconnect from master
- messageHandler.disconnect = function(message, worker) {
- worker.disconnect();
- };
-}
-
-function toDecInt(value) {
- value = parseInt(value, 10);
- return isNaN(value) ? null : value;
-}
-
-// Create a worker object, that works both for master and worker
-function Worker(customEnv) {
- if (!(this instanceof Worker)) return new Worker();
- EventEmitter.call(this);
-
- var self = this;
- var env = process.env;
-
- // Assign a unique id, default null
- this.id = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID);
-
- // XXX: Legacy. Remove in 0.9
- this.workerID = this.uniqueID = this.id;
-
- // Assign state
- this.state = 'none';
-
- // Create or get process
- if (cluster.isMaster) {
-
- // Create env object
- // first: copy and add id property
- var envCopy = util._extend({}, env);
- envCopy['NODE_UNIQUE_ID'] = this.id;
- // second: extend envCopy with the env argument
- if (isObject(customEnv)) {
- envCopy = util._extend(envCopy, customEnv);
- }
-
- // fork worker
- this.process = fork(settings.exec, settings.args, {
- 'env': envCopy,
- 'silent': settings.silent,
- 'execArgv': settings.execArgv
- });
- } else {
- this.process = process;
- }
-
- if (cluster.isMaster) {
- // Save worker in the cluster.workers array
- cluster.workers[this.id] = this;
-
- // Emit a fork event, on next tick
- // There is no worker.fork event since this has no real purpose
- process.nextTick(function() {
- cluster.emit('fork', self);
- });
- }
-
- // handle internalMessage, exit and disconnect event
- this.process.on('internalMessage', handleMessage.bind(null, this));
- this.process.once('exit', function(exitCode, signalCode) {
- prepareExit(self, 'dead');
- self.emit('exit', exitCode, signalCode);
- cluster.emit('exit', self, exitCode, signalCode);
- });
- this.process.once('disconnect', function() {
- prepareExit(self, 'disconnected');
- self.emit('disconnect');
- cluster.emit('disconnect', self);
- });
-
- // relay message and error
- this.process.on('message', this.emit.bind(this, 'message'));
- this.process.on('error', this.emit.bind(this, 'error'));
-
-}
-util.inherits(Worker, EventEmitter);
-cluster.Worker = Worker;
-
-function prepareExit(worker, state) {
-
- // set state to disconnect
- worker.state = state;
-
- // Make suicide a boolean
- worker.suicide = !!worker.suicide;
-
- // Remove from workers in the master
- if (cluster.isMaster) {
- delete cluster.workers[worker.id];
- }
-}
-
-// Send internal message
-function sendInternalMessage(worker, message/*, handler, callback*/) {
-
- // Exist callback
- var callback = arguments[arguments.length - 1];
- if (typeof callback !== 'function') {
- callback = undefined;
- }
-
- // exist handler
- var handler = arguments[2] !== callback ? arguments[2] : undefined;
-
- if (!isInternalMessage(message)) {
- message = internalMessage(message);
+ };
+ worker.state = 'listening';
+ worker.emit('listening', info);
+ cluster.emit('listening', worker, info);
}
- // Store callback for later
- if (callback) {
- message._requestEcho = worker.id + ':' + (++queryIds);
- queryCallbacks[message._requestEcho] = callback;
+ function send(worker, message, handle, cb) {
+ sendHelper(worker.process, message, handle, cb);
}
-
-
- worker.send(message, handler);
}
-// Send message to worker or master
-Worker.prototype.send = function() {
- // You could also just use process.send in a worker
- this.process.send.apply(this.process, arguments);
-};
-
-// Kill the worker without restarting
-Worker.prototype.kill = Worker.prototype.destroy = function(signal) {
- if (!signal)
- signal = 'SIGTERM';
-
- var self = this;
-
- this.suicide = true;
-
- if (cluster.isMaster) {
- // Disconnect IPC channel
- // this way the worker won't need to propagate suicide state to master
- if (self.process.connected) {
- self.process.once('disconnect', function() {
- self.process.kill(signal);
- });
- self.process.disconnect();
- } else {
- self.process.kill(signal);
- }
+function workerInit() {
+ var handles = [];
- } else {
- // Channel is open
- if (this.process.connected) {
-
- // Inform master to suicide and then kill
- sendInternalMessage(this, {cmd: 'suicide'}, function() {
- process.exit(0);
- });
-
- // When channel is closed, terminate the process
- this.process.once('disconnect', function() {
- process.exit(0);
- });
- } else {
- process.exit(0);
+ // Called from src/node.js
+ cluster._setupWorker = function() {
+ var worker = new Worker;
+ cluster.worker = worker;
+ worker.id = +process.env.NODE_UNIQUE_ID | 0;
+ worker.state = 'online';
+ worker.process = process;
+ process.once('disconnect', process.exit.bind(null, 0));
+ process.on('internalMessage', internal(worker, onmessage));
+ send({ act: 'online' });
+ function onmessage(message, handle) {
+ if (message.act === 'disconnect') worker.disconnect();
}
- }
-};
-
-// The .disconnect function will close all servers
-// and then disconnect the IPC channel.
-if (cluster.isMaster) {
- // Used in master
- Worker.prototype.disconnect = function() {
- this.suicide = true;
-
- sendInternalMessage(this, {cmd: 'disconnect'});
};
-} else {
- // Used in workers
- Worker.prototype.disconnect = function() {
- var self = this;
-
- this.suicide = true;
-
- // keep track of open servers
- var servers = Object.keys(serverListeners).length;
- var progress = new ProgressTracker(servers, function() {
- // There are no more servers open so we will close the IPC channel.
- // Closing the IPC channel will emit a disconnect event
- // in both master and worker on the process object.
- // This event will be handled by prepareExit.
- self.process.disconnect();
+ // obj is a net#Server or a dgram#Socket object.
+ cluster._getServer = function(obj, address, port, addressType, fd, cb) {
+ var message = {
+ addressType: addressType,
+ address: address,
+ port: port,
+ act: 'queryServer',
+ fd: fd
+ };
+ send(message, function(_, handle) {
+ // Monkey-patch the close() method so we can keep track of when it's
+ // closed. Avoids resource leaks when the handle is short-lived.
+ var close = handle.close;
+ handle.close = function() {
+ var index = handles.indexOf(handle);
+ if (index !== -1) handles.splice(index, 1);
+ return close.apply(this, arguments);
+ };
+ handles.push(handle);
+ cb(handle);
});
-
- // depending on where this function was called from (master or worker)
- // The suicide state has already been set,
- // but it doesn't really matter if we set it again.
- sendInternalMessage(this, {cmd: 'suicide'}, function() {
- // in case there are no servers
- progress.check();
-
- // closing all servers gracefully
- var server;
- for (var key in serverListeners) {
- server = serverListeners[key];
-
- // in case the server is closed we won't close it again
- if (server._handle === null) {
- progress.done();
- continue;
- }
-
- server.on('close', progress.done.bind(progress));
- server.close();
- }
+ obj.once('listening', function() {
+ cluster.worker.state = 'listening';
+ message.act = 'listening';
+ message.port = obj.address().port || port;
+ send(message);
});
-
};
-}
-
-// Fork a new worker
-cluster.fork = function(env) {
- // This can only be called from the master.
- assert(cluster.isMaster);
-
- // Make sure that the master has been initialized
- cluster.setupMaster();
- return (new cluster.Worker(env));
-};
-
-// execute .disconnect on all workers and close handlers when done
-cluster.disconnect = function(callback) {
- // This can only be called from the master.
- assert(cluster.isMaster);
-
- // Close all TCP handlers when all workers are disconnected
- var workers = Object.keys(cluster.workers).length;
- var progress = new ProgressTracker(workers, function() {
- for (var key in serverHandlers) {
- serverHandlers[key].close();
- delete serverHandlers[key];
- }
+ Worker.prototype.disconnect = function() {
+ for (var handle; handle = handles.shift(); handle.close());
+ process.disconnect();
+ };
- // call callback when done
- if (callback) callback();
- });
+ Worker.prototype.destroy = function() {
+ if (!process.connected) process.exit(0);
+ var exit = process.exit.bind(null, 0);
+ send({ act: 'suicide' }, exit);
+ process.once('disconnect', exit);
+ process.disconnect();
+ };
- // begin disconnecting all workers
- eachWorker(function(worker) {
- worker.once('disconnect', progress.done.bind(progress));
- worker.disconnect();
- });
+ function send(message, cb) {
+ sendHelper(process, message, null, cb);
+ }
+}
- // in case there weren't any workers
- progress.check();
-};
-// Internal function. Called from src/node.js when worker process starts.
-cluster._setupWorker = function() {
+var seq = 0;
+var callbacks = {};
+function sendHelper(proc, message, handle, cb) {
+ // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
+ message = util._extend({ cmd: 'NODE_CLUSTER' }, message);
+ if (cb) callbacks[seq] = cb;
+ message.seq = seq;
+ seq += 1;
+ proc.send(message, handle);
+}
- // Get worker class
- var worker = cluster.worker = new Worker();
- // we will terminate the worker
- // when the worker is disconnected from the parent accidentally
- process.once('disconnect', function() {
- if (worker.suicide !== true) {
- process.exit(0);
+// Returns an internalMessage listener that hands off normal messages
+// to the callback but intercepts and redirects ACK messages.
+function internal(worker, cb) {
+ return function(message, handle) {
+ if (message.cmd !== 'NODE_CLUSTER') return;
+ var fn = cb;
+ if (typeof message.ack !== 'undefined') {
+ fn = callbacks[message.ack];
+ delete callbacks[message.ack];
}
- });
-
- // Tell master that the worker is online
- worker.state = 'online';
- sendInternalMessage(worker, { cmd: 'online' });
-};
-
-// Internal function. Called by net.js and dgram.js when attempting to bind a
-// TCP server or UDP socket.
-cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
- // This can only be called from a worker.
- assert(cluster.isWorker);
-
- // Store tcp instance for later use
- var key = [address, port, addressType, fd].join(':');
- serverListeners[key] = tcpSelf;
-
- // Send a listening message to the master
- tcpSelf.once('listening', function() {
- cluster.worker.state = 'listening';
- sendInternalMessage(cluster.worker, {
- cmd: 'listening',
- address: address,
- port: tcpSelf.address().port || port,
- addressType: addressType,
- fd: fd
- });
- });
-
- // Request the fd handler from the master process
- var message = {
- cmd: 'queryServer',
- address: address,
- port: port,
- addressType: addressType,
- fd: fd
+ fn.apply(worker, arguments);
};
-
- // The callback will be stored until the master has responded
- sendInternalMessage(cluster.worker, message, function(msg, handle) {
- cb(handle);
- });
-
-};
+}