3 const EventEmitter = require('events');
4 const assert = require('assert');
5 const dgram = require('dgram');
6 const fork = require('child_process').fork;
7 const net = require('net');
8 const util = require('util');
12 const uv = process.binding('uv');
14 const cluster = new EventEmitter();
15 module.exports = cluster;
16 cluster.Worker = Worker;
17 cluster.isWorker = ('NODE_UNIQUE_ID' in process.env);
18 cluster.isMaster = (cluster.isWorker === false);
21 function Worker(options) {
22 if (!(this instanceof Worker))
23 return new Worker(options);
25 EventEmitter.call(this);
27 if (options === null || typeof options !== 'object')
30 this.suicide = undefined;
31 this.state = options.state || 'none';
32 this.id = options.id | 0;
34 if (options.process) {
35 this.process = options.process;
36 this.process.on('error', this.emit.bind(this, 'error'));
37 this.process.on('message', this.emit.bind(this, 'message'));
40 util.inherits(Worker, EventEmitter);
42 Worker.prototype.kill = function() {
43 this.destroy.apply(this, arguments);
46 Worker.prototype.send = function() {
47 this.process.send.apply(this.process, arguments);
50 Worker.prototype.isDead = function isDead() {
51 return this.process.exitCode != null || this.process.signalCode != null;
54 Worker.prototype.isConnected = function isConnected() {
55 return this.process.connected;
58 // Master/worker specific methods are defined in the *Init() functions.
60 function SharedHandle(key, address, port, addressType, backlog, fd, flags) {
66 // FIXME(bnoordhuis) Polymorphic return type for lack of a better solution.
68 if (addressType === 'udp4' || addressType === 'udp6')
69 rval = dgram._createSocketHandle(address, port, addressType, fd, flags);
71 rval = net._createServerHandle(address, port, addressType, fd);
73 if (typeof rval === 'number')
79 SharedHandle.prototype.add = function(worker, send) {
80 assert(this.workers.indexOf(worker) === -1);
81 this.workers.push(worker);
82 send(this.errno, null, this.handle);
85 SharedHandle.prototype.remove = function(worker) {
86 var index = this.workers.indexOf(worker);
87 if (index === -1) return false; // The worker wasn't sharing this handle.
88 this.workers.splice(index, 1);
89 if (this.workers.length !== 0) return false;
96 // Start a round-robin server. Master accepts connections and distributes
97 // them over the workers.
98 function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
104 this.server = net.createServer(assert.fail);
107 this.server.listen({ fd: fd });
109 this.server.listen(port, address);
111 this.server.listen(address); // UNIX socket path.
114 this.server.once('listening', function() {
115 self.handle = self.server._handle;
116 self.handle.onconnection = self.distribute.bind(self);
117 self.server._handle = null;
122 RoundRobinHandle.prototype.add = function(worker, send) {
123 assert(worker.id in this.all === false);
124 this.all[worker.id] = worker;
128 if (self.handle.getsockname) {
130 self.handle.getsockname(out);
131 // TODO(bnoordhuis) Check err.
132 send(null, { sockname: out }, null);
134 send(null, null, null); // UNIX socket.
136 self.handoff(worker); // In case there are connections pending.
139 if (this.server === null) return done();
140 // Still busy binding.
141 this.server.once('listening', done);
142 this.server.once('error', function(err) {
143 // Hack: translate 'EADDRINUSE' error string back to numeric error code.
144 // It works but ideally we'd have some backchannel between the net and
145 // cluster modules for stuff like this.
146 var errno = uv['UV_' + err.errno];
151 RoundRobinHandle.prototype.remove = function(worker) {
152 if (worker.id in this.all === false) return false;
153 delete this.all[worker.id];
154 var index = this.free.indexOf(worker);
155 if (index !== -1) this.free.splice(index, 1);
156 if (Object.getOwnPropertyNames(this.all).length !== 0) return false;
157 for (var handle; handle = this.handles.shift(); handle.close());
163 RoundRobinHandle.prototype.distribute = function(err, handle) {
164 this.handles.push(handle);
165 var worker = this.free.shift();
166 if (worker) this.handoff(worker);
169 RoundRobinHandle.prototype.handoff = function(worker) {
170 if (worker.id in this.all === false) {
171 return; // Worker is closing (or has closed) the server.
173 var handle = this.handles.shift();
174 if (handle === undefined) {
175 this.free.push(worker); // Add to ready queue again.
178 var message = { act: 'newconn', key: this.key };
180 sendHelper(worker.process, message, handle, function(reply) {
184 self.distribute(0, handle); // Worker is shutting down. Send to another.
185 self.handoff(worker);
190 if (cluster.isMaster)
195 function masterInit() {
196 cluster.workers = {};
198 var intercom = new EventEmitter();
199 cluster.settings = {};
201 // XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
202 var schedulingPolicy = {
205 }[process.env.NODE_CLUSTER_SCHED_POLICY];
207 if (schedulingPolicy === undefined) {
208 // FIXME Round-robin doesn't perform well on Windows right now due to the
209 // way IOCP is wired up. Bert is going to fix that, eventually.
210 schedulingPolicy = (process.platform === 'win32') ? SCHED_NONE : SCHED_RR;
213 cluster.schedulingPolicy = schedulingPolicy;
214 cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system.
215 cluster.SCHED_RR = SCHED_RR; // Master distributes connections.
217 // Keyed on address:port:etc. When a worker dies, we walk over the handles
218 // and remove() the worker from each one. remove() may do a linear scan
219 // itself so we might end up with an O(n*m) operation. Ergo, FIXME.
222 var initialized = false;
223 cluster.setupMaster = function(options) {
225 args: process.argv.slice(2),
226 exec: process.argv[1],
227 execArgv: process.execArgv,
230 settings = util._extend(settings, cluster.settings);
231 settings = util._extend(settings, options || {});
232 // Tell V8 to write profile data for each process to a separate file.
233 // Without --logfile=v8-%p.log, everything ends up in a single, unusable
234 // file. (Unusable because what V8 logs are memory addresses and each
235 // process has its own memory mappings.)
236 if (settings.execArgv.some(function(s) { return /^--prof/.test(s); }) &&
237 !settings.execArgv.some(function(s) { return /^--logfile=/.test(s); }))
239 settings.execArgv = settings.execArgv.concat(['--logfile=v8-%p.log']);
241 cluster.settings = settings;
242 if (initialized === true)
243 return process.nextTick(setupSettingsNT, settings);
245 schedulingPolicy = cluster.schedulingPolicy; // Freeze policy.
246 assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
247 'Bad cluster.schedulingPolicy: ' + schedulingPolicy);
249 var hasDebugArg = process.execArgv.some(function(argv) {
250 return /^(--debug|--debug-brk)(=\d+)?$/.test(argv);
253 process.nextTick(setupSettingsNT, settings);
255 // Send debug signal only if not started in debug mode, this helps a lot
256 // on windows, because RegisterDebugHandler is not called when node starts
257 // with --debug.* arg.
261 process.on('internalMessage', function(message) {
262 if (message.cmd !== 'NODE_DEBUG_ENABLED') return;
264 for (key in cluster.workers) {
265 var worker = cluster.workers[key];
266 if (worker.state === 'online' || worker.state === 'listening') {
267 process._debugProcess(worker.process.pid);
269 worker.once('online', function() {
270 process._debugProcess(this.process.pid);
277 function setupSettingsNT(settings) {
278 cluster.emit('setup', settings);
281 var debugPortOffset = 1;
283 function createWorkerProcess(id, env) {
284 var workerEnv = util._extend({}, process.env);
285 var execArgv = cluster.settings.execArgv.slice();
287 workerEnv = util._extend(workerEnv, env);
288 workerEnv.NODE_UNIQUE_ID = '' + id;
290 for (var i = 0; i < execArgv.length; i++) {
291 var match = execArgv[i].match(/^(--debug|--debug-(brk|port))(=\d+)?$/);
294 let debugPort = process.debugPort + debugPortOffset;
296 execArgv[i] = match[1] + '=' + debugPort;
300 return fork(cluster.settings.exec, cluster.settings.args, {
302 silent: cluster.settings.silent,
304 gid: cluster.settings.gid,
305 uid: cluster.settings.uid
311 cluster.fork = function(env) {
312 cluster.setupMaster();
314 const workerProcess = createWorkerProcess(id, env);
315 const worker = new Worker({
317 process: workerProcess
320 worker.on('message', this.emit.bind(this, 'message'));
322 function removeWorker(worker) {
325 delete cluster.workers[worker.id];
327 if (Object.keys(cluster.workers).length === 0) {
328 assert(Object.keys(handles).length === 0, 'Resource leak detected.');
329 intercom.emit('disconnect');
333 function removeHandlesForWorker(worker) {
336 for (var key in handles) {
337 var handle = handles[key];
338 if (handle.remove(worker)) delete handles[key];
342 worker.process.once('exit', function(exitCode, signalCode) {
344 * Remove the worker from the workers list only
345 * if it has disconnected, otherwise we might
346 * still want to access it.
348 if (!worker.isConnected()) removeWorker(worker);
350 worker.suicide = !!worker.suicide;
351 worker.state = 'dead';
352 worker.emit('exit', exitCode, signalCode);
353 cluster.emit('exit', worker, exitCode, signalCode);
356 worker.process.once('disconnect', function() {
358 * Now is a good time to remove the handles
359 * associated with this worker because it is
360 * not connected to the master anymore.
362 removeHandlesForWorker(worker);
365 * Remove the worker from the workers list only
366 * if its process has exited. Otherwise, we might
367 * still want to access it.
369 if (worker.isDead()) removeWorker(worker);
371 worker.suicide = !!worker.suicide;
372 worker.state = 'disconnected';
373 worker.emit('disconnect');
374 cluster.emit('disconnect', worker);
377 worker.process.on('internalMessage', internal(worker, onmessage));
378 process.nextTick(emitForkNT, worker);
379 cluster.workers[worker.id] = worker;
383 function emitForkNT(worker) {
384 cluster.emit('fork', worker);
387 cluster.disconnect = function(cb) {
388 var workers = Object.keys(cluster.workers);
389 if (workers.length === 0) {
390 process.nextTick(intercom.emit.bind(intercom, 'disconnect'));
392 for (var key in workers) {
394 if (cluster.workers[key].isConnected())
395 cluster.workers[key].disconnect();
398 if (cb) intercom.once('disconnect', cb);
401 Worker.prototype.disconnect = function() {
403 send(this, { act: 'disconnect' });
406 Worker.prototype.destroy = function(signo) {
407 signo = signo || 'SIGTERM';
408 var proc = this.process;
409 if (this.isConnected()) {
410 this.once('disconnect', proc.kill.bind(proc, signo));
417 function onmessage(message, handle) {
419 if (message.act === 'online')
421 else if (message.act === 'queryServer')
422 queryServer(worker, message);
423 else if (message.act === 'listening')
424 listening(worker, message);
425 else if (message.act === 'suicide')
426 worker.suicide = true;
427 else if (message.act === 'close')
428 close(worker, message);
431 function online(worker) {
432 worker.state = 'online';
433 worker.emit('online');
434 cluster.emit('online', worker);
437 function queryServer(worker, message) {
438 var args = [message.address,
443 var key = args.join(':');
444 var handle = handles[key];
445 if (handle === undefined) {
446 var constructor = RoundRobinHandle;
447 // UDP is exempt from round-robin connection balancing for what should
448 // be obvious reasons: it's connectionless. There is nothing to send to
449 // the workers except raw datagrams and that's pointless.
450 if (schedulingPolicy !== SCHED_RR ||
451 message.addressType === 'udp4' ||
452 message.addressType === 'udp6') {
453 constructor = SharedHandle;
455 handles[key] = handle = new constructor(key,
463 if (!handle.data) handle.data = message.data;
465 // Set custom server data
466 handle.add(worker, function(errno, reply, handle) {
467 reply = util._extend({
471 data: handles[key].data
473 if (errno) delete handles[key]; // Gives other workers a chance to retry.
474 send(worker, reply, handle);
478 function listening(worker, message) {
480 addressType: message.addressType,
481 address: message.address,
485 worker.state = 'listening';
486 worker.emit('listening', info);
487 cluster.emit('listening', worker, info);
490 // Server in worker is closing, remove from list.
491 function close(worker, message) {
492 var key = message.key;
493 var handle = handles[key];
494 if (handle.remove(worker)) delete handles[key];
497 function send(worker, message, handle, cb) {
498 sendHelper(worker.process, message, handle, cb);
503 function workerInit() {
507 // Called from src/node.js
508 cluster._setupWorker = function() {
509 var worker = new Worker({
510 id: +process.env.NODE_UNIQUE_ID | 0,
514 cluster.worker = worker;
515 process.once('disconnect', function() {
516 worker.emit('disconnect');
517 if (!worker.suicide) {
518 // Unexpected disconnect, master exited, or some such nastiness, so
519 // worker exits immediately.
523 process.on('internalMessage', internal(worker, onmessage));
524 send({ act: 'online' });
525 function onmessage(message, handle) {
526 if (message.act === 'newconn')
527 onconnection(message, handle);
528 else if (message.act === 'disconnect')
533 // obj is a net#Server or a dgram#Socket object.
534 cluster._getServer = function(obj, options, cb) {
535 const key = [ options.address,
538 options.fd ].join(':');
539 if (indexes[key] === undefined)
544 const message = util._extend({
550 // Set custom data on handle (i.e. tls tickets key)
551 if (obj._getServerData) message.data = obj._getServerData();
552 send(message, function(reply, handle) {
553 if (obj._setServerData) obj._setServerData(reply.data);
556 shared(reply, handle, cb); // Shared listen socket.
558 rr(reply, cb); // Round-robin.
560 obj.once('listening', function() {
561 cluster.worker.state = 'listening';
562 const address = obj.address();
563 message.act = 'listening';
564 message.port = address && address.port || options.port;
569 // Shared listen socket.
570 function shared(message, handle, cb) {
571 var key = message.key;
572 // Monkey-patch the close() method so we can keep track of when it's
573 // closed. Avoids resource leaks when the handle is short-lived.
574 var close = handle.close;
575 handle.close = function() {
576 send({ act: 'close', key: key });
578 return close.apply(this, arguments);
580 assert(handles[key] === undefined);
581 handles[key] = handle;
582 cb(message.errno, handle);
585 // Round-robin. Master distributes handles across workers.
586 function rr(message, cb) {
588 return cb(message.errno, null);
590 var key = message.key;
591 function listen(backlog) {
592 // TODO(bnoordhuis) Send a message to the master that tells it to
593 // update the backlog size. The actual backlog should probably be
594 // the largest requested size by any worker.
599 // lib/net.js treats server._handle.close() as effectively synchronous.
600 // That means there is a time window between the call to close() and
601 // the ack by the master process in which we can still receive handles.
602 // onconnection() below handles that by sending those handles back to
604 if (key === undefined) return;
605 send({ act: 'close', key: key });
610 function getsockname(out) {
611 if (key) util._extend(out, message.sockname);
615 // XXX(bnoordhuis) Probably no point in implementing ref() and unref()
616 // because the control channel is going to keep the worker alive anyway.
623 // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
624 // with it. Fools net.Server into thinking that it's backed by a real
632 if (message.sockname) {
633 handle.getsockname = getsockname; // TCP handles only.
635 assert(handles[key] === undefined);
636 handles[key] = handle;
640 // Round-robin connection.
641 function onconnection(message, handle) {
642 var key = message.key;
643 var server = handles[key];
644 var accepted = server !== undefined;
645 send({ ack: message.seq, accepted: accepted });
646 if (accepted) server.onconnection(0, handle);
649 Worker.prototype.disconnect = function() {
651 var waitingHandles = 0;
653 function checkRemainingHandles() {
655 if (waitingHandles === 0) {
656 process.disconnect();
660 for (var key in handles) {
661 var handle = handles[key];
664 handle.owner.close(checkRemainingHandles);
667 if (waitingHandles === 0) {
668 process.disconnect();
673 Worker.prototype.destroy = function() {
675 if (!this.isConnected()) process.exit(0);
676 var exit = process.exit.bind(null, 0);
677 send({ act: 'suicide' }, exit);
678 process.once('disconnect', exit);
679 process.disconnect();
682 function send(message, cb) {
683 sendHelper(process, message, null, cb);
690 function sendHelper(proc, message, handle, cb) {
691 // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
692 message = util._extend({ cmd: 'NODE_CLUSTER' }, message);
693 if (cb) callbacks[seq] = cb;
696 proc.send(message, handle);
700 // Returns an internalMessage listener that hands off normal messages
701 // to the callback but intercepts and redirects ACK messages.
702 function internal(worker, cb) {
703 return function(message, handle) {
704 if (message.cmd !== 'NODE_CLUSTER') return;
706 if (message.ack !== undefined) {
707 fn = callbacks[message.ack];
708 delete callbacks[message.ack];
710 fn.apply(worker, arguments);