doc: improvements to console.markdown copy
[platform/upstream/nodejs.git] / lib / cluster.js
1 'use strict';
2
3 const EventEmitter = require('events');
4 const assert = require('assert');
5 const dgram = require('dgram');
6 const fork = require('child_process').fork;
7 const net = require('net');
8 const util = require('util');
9 const SCHED_NONE = 1;
10 const SCHED_RR = 2;
11
12 const uv = process.binding('uv');
13
14 const cluster = new EventEmitter();
15 module.exports = cluster;
16 cluster.Worker = Worker;
17 cluster.isWorker = ('NODE_UNIQUE_ID' in process.env);
18 cluster.isMaster = (cluster.isWorker === false);
19
20
21 function Worker(options) {
22   if (!(this instanceof Worker))
23     return new Worker(options);
24
25   EventEmitter.call(this);
26
27   if (options === null || typeof options !== 'object')
28     options = {};
29
30   this.suicide = undefined;
31   this.state = options.state || 'none';
32   this.id = options.id | 0;
33
34   if (options.process) {
35     this.process = options.process;
36     this.process.on('error', this.emit.bind(this, 'error'));
37     this.process.on('message', this.emit.bind(this, 'message'));
38   }
39 }
40 util.inherits(Worker, EventEmitter);
41
42 Worker.prototype.kill = function() {
43   this.destroy.apply(this, arguments);
44 };
45
46 Worker.prototype.send = function() {
47   this.process.send.apply(this.process, arguments);
48 };
49
50 Worker.prototype.isDead = function isDead() {
51   return this.process.exitCode != null || this.process.signalCode != null;
52 };
53
54 Worker.prototype.isConnected = function isConnected() {
55   return this.process.connected;
56 };
57
58 // Master/worker specific methods are defined in the *Init() functions.
59
60 function SharedHandle(key, address, port, addressType, backlog, fd, flags) {
61   this.key = key;
62   this.workers = [];
63   this.handle = null;
64   this.errno = 0;
65
66   // FIXME(bnoordhuis) Polymorphic return type for lack of a better solution.
67   var rval;
68   if (addressType === 'udp4' || addressType === 'udp6')
69     rval = dgram._createSocketHandle(address, port, addressType, fd, flags);
70   else
71     rval = net._createServerHandle(address, port, addressType, fd);
72
73   if (typeof rval === 'number')
74     this.errno = rval;
75   else
76     this.handle = rval;
77 }
78
79 SharedHandle.prototype.add = function(worker, send) {
80   assert(this.workers.indexOf(worker) === -1);
81   this.workers.push(worker);
82   send(this.errno, null, this.handle);
83 };
84
85 SharedHandle.prototype.remove = function(worker) {
86   var index = this.workers.indexOf(worker);
87   if (index === -1) return false; // The worker wasn't sharing this handle.
88   this.workers.splice(index, 1);
89   if (this.workers.length !== 0) return false;
90   this.handle.close();
91   this.handle = null;
92   return true;
93 };
94
95
96 // Start a round-robin server. Master accepts connections and distributes
97 // them over the workers.
98 function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
99   this.key = key;
100   this.all = {};
101   this.free = [];
102   this.handles = [];
103   this.handle = null;
104   this.server = net.createServer(assert.fail);
105
106   if (fd >= 0)
107     this.server.listen({ fd: fd });
108   else if (port >= 0)
109     this.server.listen(port, address);
110   else
111     this.server.listen(address);  // UNIX socket path.
112
113   var self = this;
114   this.server.once('listening', function() {
115     self.handle = self.server._handle;
116     self.handle.onconnection = self.distribute.bind(self);
117     self.server._handle = null;
118     self.server = null;
119   });
120 }
121
122 RoundRobinHandle.prototype.add = function(worker, send) {
123   assert(worker.id in this.all === false);
124   this.all[worker.id] = worker;
125
126   var self = this;
127   function done() {
128     if (self.handle.getsockname) {
129       var out = {};
130       self.handle.getsockname(out);
131       // TODO(bnoordhuis) Check err.
132       send(null, { sockname: out }, null);
133     } else {
134       send(null, null, null);  // UNIX socket.
135     }
136     self.handoff(worker);  // In case there are connections pending.
137   }
138
139   if (this.server === null) return done();
140   // Still busy binding.
141   this.server.once('listening', done);
142   this.server.once('error', function(err) {
143     // Hack: translate 'EADDRINUSE' error string back to numeric error code.
144     // It works but ideally we'd have some backchannel between the net and
145     // cluster modules for stuff like this.
146     var errno = uv['UV_' + err.errno];
147     send(errno, null);
148   });
149 };
150
151 RoundRobinHandle.prototype.remove = function(worker) {
152   if (worker.id in this.all === false) return false;
153   delete this.all[worker.id];
154   var index = this.free.indexOf(worker);
155   if (index !== -1) this.free.splice(index, 1);
156   if (Object.getOwnPropertyNames(this.all).length !== 0) return false;
157   for (var handle; handle = this.handles.shift(); handle.close());
158   this.handle.close();
159   this.handle = null;
160   return true;
161 };
162
163 RoundRobinHandle.prototype.distribute = function(err, handle) {
164   this.handles.push(handle);
165   var worker = this.free.shift();
166   if (worker) this.handoff(worker);
167 };
168
169 RoundRobinHandle.prototype.handoff = function(worker) {
170   if (worker.id in this.all === false) {
171     return;  // Worker is closing (or has closed) the server.
172   }
173   var handle = this.handles.shift();
174   if (handle === undefined) {
175     this.free.push(worker);  // Add to ready queue again.
176     return;
177   }
178   var message = { act: 'newconn', key: this.key };
179   var self = this;
180   sendHelper(worker.process, message, handle, function(reply) {
181     if (reply.accepted)
182       handle.close();
183     else
184       self.distribute(0, handle);  // Worker is shutting down. Send to another.
185     self.handoff(worker);
186   });
187 };
188
189
190 if (cluster.isMaster)
191   masterInit();
192 else
193   workerInit();
194
195 function masterInit() {
196   cluster.workers = {};
197
198   var intercom = new EventEmitter();
199   cluster.settings = {};
200
201   // XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
202   var schedulingPolicy = {
203     'none': SCHED_NONE,
204     'rr': SCHED_RR
205   }[process.env.NODE_CLUSTER_SCHED_POLICY];
206
207   if (schedulingPolicy === undefined) {
208     // FIXME Round-robin doesn't perform well on Windows right now due to the
209     // way IOCP is wired up. Bert is going to fix that, eventually.
210     schedulingPolicy = (process.platform === 'win32') ? SCHED_NONE : SCHED_RR;
211   }
212
213   cluster.schedulingPolicy = schedulingPolicy;
214   cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
215   cluster.SCHED_RR = SCHED_RR;      // Master distributes connections.
216
217   // Keyed on address:port:etc. When a worker dies, we walk over the handles
218   // and remove() the worker from each one. remove() may do a linear scan
219   // itself so we might end up with an O(n*m) operation. Ergo, FIXME.
220   const handles = require('internal/cluster').handles;
221
222   var initialized = false;
223   cluster.setupMaster = function(options) {
224     var settings = {
225       args: process.argv.slice(2),
226       exec: process.argv[1],
227       execArgv: process.execArgv,
228       silent: false
229     };
230     settings = util._extend(settings, cluster.settings);
231     settings = util._extend(settings, options || {});
232     // Tell V8 to write profile data for each process to a separate file.
233     // Without --logfile=v8-%p.log, everything ends up in a single, unusable
234     // file. (Unusable because what V8 logs are memory addresses and each
235     // process has its own memory mappings.)
236     if (settings.execArgv.some(function(s) { return /^--prof/.test(s); }) &&
237         !settings.execArgv.some(function(s) { return /^--logfile=/.test(s); }))
238     {
239       settings.execArgv = settings.execArgv.concat(['--logfile=v8-%p.log']);
240     }
241     cluster.settings = settings;
242     if (initialized === true)
243       return process.nextTick(setupSettingsNT, settings);
244     initialized = true;
245     schedulingPolicy = cluster.schedulingPolicy;  // Freeze policy.
246     assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
247            'Bad cluster.schedulingPolicy: ' + schedulingPolicy);
248
249     var hasDebugArg = process.execArgv.some(function(argv) {
250       return /^(--debug|--debug-brk)(=\d+)?$/.test(argv);
251     });
252
253     process.nextTick(setupSettingsNT, settings);
254
255     // Send debug signal only if not started in debug mode, this helps a lot
256     // on windows, because RegisterDebugHandler is not called when node starts
257     // with --debug.* arg.
258     if (hasDebugArg)
259       return;
260
261     process.on('internalMessage', function(message) {
262       if (message.cmd !== 'NODE_DEBUG_ENABLED') return;
263       var key;
264       for (key in cluster.workers) {
265         var worker = cluster.workers[key];
266         if (worker.state === 'online' || worker.state === 'listening') {
267           process._debugProcess(worker.process.pid);
268         } else {
269           worker.once('online', function() {
270             process._debugProcess(this.process.pid);
271           });
272         }
273       }
274     });
275   };
276
277   function setupSettingsNT(settings) {
278     cluster.emit('setup', settings);
279   }
280
281   var debugPortOffset = 1;
282
283   function createWorkerProcess(id, env) {
284     var workerEnv = util._extend({}, process.env);
285     var execArgv = cluster.settings.execArgv.slice();
286
287     workerEnv = util._extend(workerEnv, env);
288     workerEnv.NODE_UNIQUE_ID = '' + id;
289
290     for (var i = 0; i < execArgv.length; i++) {
291       var match = execArgv[i].match(/^(--debug|--debug-(brk|port))(=\d+)?$/);
292
293       if (match) {
294         const debugPort = process.debugPort + debugPortOffset;
295         ++debugPortOffset;
296         execArgv[i] = match[1] + '=' + debugPort;
297       }
298     }
299
300     return fork(cluster.settings.exec, cluster.settings.args, {
301       env: workerEnv,
302       silent: cluster.settings.silent,
303       execArgv: execArgv,
304       gid: cluster.settings.gid,
305       uid: cluster.settings.uid
306     });
307   }
308
309   var ids = 0;
310
311   function removeWorker(worker) {
312     assert(worker);
313
314     delete cluster.workers[worker.id];
315
316     if (Object.keys(cluster.workers).length === 0) {
317       assert(Object.keys(handles).length === 0, 'Resource leak detected.');
318       intercom.emit('disconnect');
319     }
320   }
321
322   function removeHandlesForWorker(worker) {
323     assert(worker);
324
325     for (var key in handles) {
326       var handle = handles[key];
327       if (handle.remove(worker)) delete handles[key];
328     }
329   }
330
331   cluster.fork = function(env) {
332     cluster.setupMaster();
333     const id = ++ids;
334     const workerProcess = createWorkerProcess(id, env);
335     const worker = new Worker({
336       id: id,
337       process: workerProcess
338     });
339
340     worker.on('message', this.emit.bind(this, 'message'));
341
342     worker.process.once('exit', function(exitCode, signalCode) {
343       /*
344        * Remove the worker from the workers list only
345        * if it has disconnected, otherwise we might
346        * still want to access it.
347        */
348       if (!worker.isConnected()) {
349         removeHandlesForWorker(worker);
350         removeWorker(worker);
351       }
352
353       worker.suicide = !!worker.suicide;
354       worker.state = 'dead';
355       worker.emit('exit', exitCode, signalCode);
356       cluster.emit('exit', worker, exitCode, signalCode);
357     });
358
359     worker.process.once('disconnect', function() {
360       /*
361        * Now is a good time to remove the handles
362        * associated with this worker because it is
363        * not connected to the master anymore.
364        */
365       removeHandlesForWorker(worker);
366
367       /*
368        * Remove the worker from the workers list only
369        * if its process has exited. Otherwise, we might
370        * still want to access it.
371        */
372       if (worker.isDead()) removeWorker(worker);
373
374       worker.suicide = !!worker.suicide;
375       worker.state = 'disconnected';
376       worker.emit('disconnect');
377       cluster.emit('disconnect', worker);
378     });
379
380     worker.process.on('internalMessage', internal(worker, onmessage));
381     process.nextTick(emitForkNT, worker);
382     cluster.workers[worker.id] = worker;
383     return worker;
384   };
385
386   function emitForkNT(worker) {
387     cluster.emit('fork', worker);
388   }
389
390   cluster.disconnect = function(cb) {
391     var workers = Object.keys(cluster.workers);
392     if (workers.length === 0) {
393       process.nextTick(intercom.emit.bind(intercom, 'disconnect'));
394     } else {
395       for (var key in workers) {
396         key = workers[key];
397         if (cluster.workers[key].isConnected())
398           cluster.workers[key].disconnect();
399       }
400     }
401     if (cb) intercom.once('disconnect', cb);
402   };
403
404   Worker.prototype.disconnect = function() {
405     this.suicide = true;
406     send(this, { act: 'disconnect' });
407     removeHandlesForWorker(this);
408     removeWorker(this);
409   };
410
411   Worker.prototype.destroy = function(signo) {
412     signo = signo || 'SIGTERM';
413     var proc = this.process;
414     if (this.isConnected()) {
415       this.once('disconnect', proc.kill.bind(proc, signo));
416       this.disconnect();
417       return;
418     }
419     proc.kill(signo);
420   };
421
422   function onmessage(message, handle) {
423     var worker = this;
424     if (message.act === 'online')
425       online(worker);
426     else if (message.act === 'queryServer')
427       queryServer(worker, message);
428     else if (message.act === 'listening')
429       listening(worker, message);
430     else if (message.act === 'suicide')
431       worker.suicide = true;
432     else if (message.act === 'close')
433       close(worker, message);
434   }
435
436   function online(worker) {
437     worker.state = 'online';
438     worker.emit('online');
439     cluster.emit('online', worker);
440   }
441
442   function queryServer(worker, message) {
443     var args = [message.address,
444                 message.port,
445                 message.addressType,
446                 message.fd,
447                 message.index];
448     var key = args.join(':');
449     var handle = handles[key];
450     if (handle === undefined) {
451       var constructor = RoundRobinHandle;
452       // UDP is exempt from round-robin connection balancing for what should
453       // be obvious reasons: it's connectionless. There is nothing to send to
454       // the workers except raw datagrams and that's pointless.
455       if (schedulingPolicy !== SCHED_RR ||
456           message.addressType === 'udp4' ||
457           message.addressType === 'udp6') {
458         constructor = SharedHandle;
459       }
460       handles[key] = handle = new constructor(key,
461                                               message.address,
462                                               message.port,
463                                               message.addressType,
464                                               message.backlog,
465                                               message.fd,
466                                               message.flags);
467     }
468     if (!handle.data) handle.data = message.data;
469
470     // Set custom server data
471     handle.add(worker, function(errno, reply, handle) {
472       reply = util._extend({
473         errno: errno,
474         key: key,
475         ack: message.seq,
476         data: handles[key].data
477       }, reply);
478       if (errno) delete handles[key];  // Gives other workers a chance to retry.
479       send(worker, reply, handle);
480     });
481   }
482
483   function listening(worker, message) {
484     var info = {
485       addressType: message.addressType,
486       address: message.address,
487       port: message.port,
488       fd: message.fd
489     };
490     worker.state = 'listening';
491     worker.emit('listening', info);
492     cluster.emit('listening', worker, info);
493   }
494
495   // Server in worker is closing, remove from list.  The handle may have been
496   // removed by a prior call to removeHandlesForWorker() so guard against that.
497   function close(worker, message) {
498     var key = message.key;
499     var handle = handles[key];
500     if (handle && handle.remove(worker)) delete handles[key];
501   }
502
503   function send(worker, message, handle, cb) {
504     sendHelper(worker.process, message, handle, cb);
505   }
506 }
507
508
509 function workerInit() {
510   var handles = {};
511   var indexes = {};
512
513   // Called from src/node.js
514   cluster._setupWorker = function() {
515     var worker = new Worker({
516       id: +process.env.NODE_UNIQUE_ID | 0,
517       process: process,
518       state: 'online'
519     });
520     cluster.worker = worker;
521     process.once('disconnect', function() {
522       worker.emit('disconnect');
523       if (!worker.suicide) {
524         // Unexpected disconnect, master exited, or some such nastiness, so
525         // worker exits immediately.
526         process.exit(0);
527       }
528     });
529     process.on('internalMessage', internal(worker, onmessage));
530     send({ act: 'online' });
531     function onmessage(message, handle) {
532       if (message.act === 'newconn')
533         onconnection(message, handle);
534       else if (message.act === 'disconnect')
535         worker.disconnect();
536     }
537   };
538
539   // obj is a net#Server or a dgram#Socket object.
540   cluster._getServer = function(obj, options, cb) {
541     const key = [ options.address,
542                 options.port,
543                 options.addressType,
544                 options.fd ].join(':');
545     if (indexes[key] === undefined)
546       indexes[key] = 0;
547     else
548       indexes[key]++;
549
550     const message = util._extend({
551       act: 'queryServer',
552       index: indexes[key],
553       data: null
554     }, options);
555
556     // Set custom data on handle (i.e. tls tickets key)
557     if (obj._getServerData) message.data = obj._getServerData();
558     send(message, function(reply, handle) {
559       if (obj._setServerData) obj._setServerData(reply.data);
560
561       if (handle)
562         shared(reply, handle, cb);  // Shared listen socket.
563       else
564         rr(reply, cb);              // Round-robin.
565     });
566     obj.once('listening', function() {
567       cluster.worker.state = 'listening';
568       const address = obj.address();
569       message.act = 'listening';
570       message.port = address && address.port || options.port;
571       send(message);
572     });
573   };
574
575   // Shared listen socket.
576   function shared(message, handle, cb) {
577     var key = message.key;
578     // Monkey-patch the close() method so we can keep track of when it's
579     // closed. Avoids resource leaks when the handle is short-lived.
580     var close = handle.close;
581     handle.close = function() {
582       send({ act: 'close', key: key });
583       delete handles[key];
584       return close.apply(this, arguments);
585     };
586     assert(handles[key] === undefined);
587     handles[key] = handle;
588     cb(message.errno, handle);
589   }
590
591   // Round-robin. Master distributes handles across workers.
592   function rr(message, cb) {
593     if (message.errno)
594       return cb(message.errno, null);
595
596     var key = message.key;
597     function listen(backlog) {
598       // TODO(bnoordhuis) Send a message to the master that tells it to
599       // update the backlog size. The actual backlog should probably be
600       // the largest requested size by any worker.
601       return 0;
602     }
603
604     function close() {
605       // lib/net.js treats server._handle.close() as effectively synchronous.
606       // That means there is a time window between the call to close() and
607       // the ack by the master process in which we can still receive handles.
608       // onconnection() below handles that by sending those handles back to
609       // the master.
610       if (key === undefined) return;
611       send({ act: 'close', key: key });
612       delete handles[key];
613       key = undefined;
614     }
615
616     function getsockname(out) {
617       if (key) util._extend(out, message.sockname);
618       return 0;
619     }
620
621     // XXX(bnoordhuis) Probably no point in implementing ref() and unref()
622     // because the control channel is going to keep the worker alive anyway.
623     function ref() {
624     }
625
626     function unref() {
627     }
628
629     // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
630     // with it. Fools net.Server into thinking that it's backed by a real
631     // handle.
632     var handle = {
633       close: close,
634       listen: listen,
635       ref: ref,
636       unref: unref,
637     };
638     if (message.sockname) {
639       handle.getsockname = getsockname;  // TCP handles only.
640     }
641     assert(handles[key] === undefined);
642     handles[key] = handle;
643     cb(0, handle);
644   }
645
646   // Round-robin connection.
647   function onconnection(message, handle) {
648     var key = message.key;
649     var server = handles[key];
650     var accepted = server !== undefined;
651     send({ ack: message.seq, accepted: accepted });
652     if (accepted) server.onconnection(0, handle);
653   }
654
655   Worker.prototype.disconnect = function() {
656     this.suicide = true;
657     let waitingCount = 1;
658
659     function checkWaitingCount() {
660       waitingCount--;
661       if (waitingCount === 0) {
662         send({ act: 'suicide' });
663         process.disconnect();
664       }
665     }
666
667     for (const key in handles) {
668       const handle = handles[key];
669       delete handles[key];
670       waitingCount++;
671       handle.owner.close(checkWaitingCount);
672     }
673
674     checkWaitingCount();
675   };
676
677   Worker.prototype.destroy = function() {
678     this.suicide = true;
679     if (!this.isConnected()) process.exit(0);
680     var exit = process.exit.bind(null, 0);
681     send({ act: 'suicide' }, exit);
682     process.once('disconnect', exit);
683     process.disconnect();
684   };
685
686   function send(message, cb) {
687     sendHelper(process, message, null, cb);
688   }
689 }
690
691
692 var seq = 0;
693 var callbacks = {};
694 function sendHelper(proc, message, handle, cb) {
695   // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
696   message = util._extend({ cmd: 'NODE_CLUSTER' }, message);
697   if (cb) callbacks[seq] = cb;
698   message.seq = seq;
699   seq += 1;
700   proc.send(message, handle);
701 }
702
703
704 // Returns an internalMessage listener that hands off normal messages
705 // to the callback but intercepts and redirects ACK messages.
706 function internal(worker, cb) {
707   return function(message, handle) {
708     if (message.cmd !== 'NODE_CLUSTER') return;
709     var fn = cb;
710     if (message.ack !== undefined) {
711       fn = callbacks[message.ack];
712       delete callbacks[message.ack];
713     }
714     fn.apply(worker, arguments);
715   };
716 }