dns: add missing exports.BADNAME
[platform/upstream/nodejs.git] / lib / cluster.js
1 'use strict';
2
3 const EventEmitter = require('events');
4 const assert = require('assert');
5 const dgram = require('dgram');
6 const fork = require('child_process').fork;
7 const net = require('net');
8 const util = require('util');
9 const SCHED_NONE = 1;
10 const SCHED_RR = 2;
11
12 const uv = process.binding('uv');
13
14 const cluster = new EventEmitter();
15 module.exports = cluster;
16 cluster.Worker = Worker;
17 cluster.isWorker = ('NODE_UNIQUE_ID' in process.env);
18 cluster.isMaster = (cluster.isWorker === false);
19
20
21 function Worker(options) {
22   if (!(this instanceof Worker))
23     return new Worker(options);
24
25   EventEmitter.call(this);
26
27   if (options === null || typeof options !== 'object')
28     options = {};
29
30   this.suicide = undefined;
31   this.state = options.state || 'none';
32   this.id = options.id | 0;
33
34   if (options.process) {
35     this.process = options.process;
36     this.process.on('error', this.emit.bind(this, 'error'));
37     this.process.on('message', this.emit.bind(this, 'message'));
38   }
39 }
40 util.inherits(Worker, EventEmitter);
41
42 Worker.prototype.kill = function() {
43   this.destroy.apply(this, arguments);
44 };
45
46 Worker.prototype.send = function() {
47   this.process.send.apply(this.process, arguments);
48 };
49
50 Worker.prototype.isDead = function isDead() {
51   return this.process.exitCode != null || this.process.signalCode != null;
52 };
53
54 Worker.prototype.isConnected = function isConnected() {
55   return this.process.connected;
56 };
57
58 // Master/worker specific methods are defined in the *Init() functions.
59
60 function SharedHandle(key, address, port, addressType, backlog, fd, flags) {
61   this.key = key;
62   this.workers = [];
63   this.handle = null;
64   this.errno = 0;
65
66   // FIXME(bnoordhuis) Polymorphic return type for lack of a better solution.
67   var rval;
68   if (addressType === 'udp4' || addressType === 'udp6')
69     rval = dgram._createSocketHandle(address, port, addressType, fd, flags);
70   else
71     rval = net._createServerHandle(address, port, addressType, fd);
72
73   if (typeof rval === 'number')
74     this.errno = rval;
75   else
76     this.handle = rval;
77 }
78
79 SharedHandle.prototype.add = function(worker, send) {
80   assert(this.workers.indexOf(worker) === -1);
81   this.workers.push(worker);
82   send(this.errno, null, this.handle);
83 };
84
85 SharedHandle.prototype.remove = function(worker) {
86   var index = this.workers.indexOf(worker);
87   if (index === -1) return false; // The worker wasn't sharing this handle.
88   this.workers.splice(index, 1);
89   if (this.workers.length !== 0) return false;
90   this.handle.close();
91   this.handle = null;
92   return true;
93 };
94
95
96 // Start a round-robin server. Master accepts connections and distributes
97 // them over the workers.
98 function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
99   this.key = key;
100   this.all = {};
101   this.free = [];
102   this.handles = [];
103   this.handle = null;
104   this.server = net.createServer(assert.fail);
105
106   if (fd >= 0)
107     this.server.listen({ fd: fd });
108   else if (port >= 0)
109     this.server.listen(port, address);
110   else
111     this.server.listen(address);  // UNIX socket path.
112
113   var self = this;
114   this.server.once('listening', function() {
115     self.handle = self.server._handle;
116     self.handle.onconnection = self.distribute.bind(self);
117     self.server._handle = null;
118     self.server = null;
119   });
120 }
121
122 RoundRobinHandle.prototype.add = function(worker, send) {
123   assert(worker.id in this.all === false);
124   this.all[worker.id] = worker;
125
126   var self = this;
127   function done() {
128     if (self.handle.getsockname) {
129       var out = {};
130       self.handle.getsockname(out);
131       // TODO(bnoordhuis) Check err.
132       send(null, { sockname: out }, null);
133     } else {
134       send(null, null, null);  // UNIX socket.
135     }
136     self.handoff(worker);  // In case there are connections pending.
137   }
138
139   if (this.server === null) return done();
140   // Still busy binding.
141   this.server.once('listening', done);
142   this.server.once('error', function(err) {
143     // Hack: translate 'EADDRINUSE' error string back to numeric error code.
144     // It works but ideally we'd have some backchannel between the net and
145     // cluster modules for stuff like this.
146     var errno = uv['UV_' + err.errno];
147     send(errno, null);
148   });
149 };
150
151 RoundRobinHandle.prototype.remove = function(worker) {
152   if (worker.id in this.all === false) return false;
153   delete this.all[worker.id];
154   var index = this.free.indexOf(worker);
155   if (index !== -1) this.free.splice(index, 1);
156   if (Object.getOwnPropertyNames(this.all).length !== 0) return false;
157   for (var handle; handle = this.handles.shift(); handle.close());
158   this.handle.close();
159   this.handle = null;
160   return true;
161 };
162
163 RoundRobinHandle.prototype.distribute = function(err, handle) {
164   this.handles.push(handle);
165   var worker = this.free.shift();
166   if (worker) this.handoff(worker);
167 };
168
169 RoundRobinHandle.prototype.handoff = function(worker) {
170   if (worker.id in this.all === false) {
171     return;  // Worker is closing (or has closed) the server.
172   }
173   var handle = this.handles.shift();
174   if (handle === undefined) {
175     this.free.push(worker);  // Add to ready queue again.
176     return;
177   }
178   var message = { act: 'newconn', key: this.key };
179   var self = this;
180   sendHelper(worker.process, message, handle, function(reply) {
181     if (reply.accepted)
182       handle.close();
183     else
184       self.distribute(0, handle);  // Worker is shutting down. Send to another.
185     self.handoff(worker);
186   });
187 };
188
189
190 if (cluster.isMaster)
191   masterInit();
192 else
193   workerInit();
194
195 function masterInit() {
196   cluster.workers = {};
197
198   var intercom = new EventEmitter();
199   cluster.settings = {};
200
201   // XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
202   var schedulingPolicy = {
203     'none': SCHED_NONE,
204     'rr': SCHED_RR
205   }[process.env.NODE_CLUSTER_SCHED_POLICY];
206
207   if (schedulingPolicy === undefined) {
208     // FIXME Round-robin doesn't perform well on Windows right now due to the
209     // way IOCP is wired up. Bert is going to fix that, eventually.
210     schedulingPolicy = (process.platform === 'win32') ? SCHED_NONE : SCHED_RR;
211   }
212
213   cluster.schedulingPolicy = schedulingPolicy;
214   cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
215   cluster.SCHED_RR = SCHED_RR;      // Master distributes connections.
216
217   // Keyed on address:port:etc. When a worker dies, we walk over the handles
218   // and remove() the worker from each one. remove() may do a linear scan
219   // itself so we might end up with an O(n*m) operation. Ergo, FIXME.
220   var handles = {};
221
222   var initialized = false;
223   cluster.setupMaster = function(options) {
224     var settings = {
225       args: process.argv.slice(2),
226       exec: process.argv[1],
227       execArgv: process.execArgv,
228       silent: false
229     };
230     settings = util._extend(settings, cluster.settings);
231     settings = util._extend(settings, options || {});
232     // Tell V8 to write profile data for each process to a separate file.
233     // Without --logfile=v8-%p.log, everything ends up in a single, unusable
234     // file. (Unusable because what V8 logs are memory addresses and each
235     // process has its own memory mappings.)
236     if (settings.execArgv.some(function(s) { return /^--prof/.test(s); }) &&
237         !settings.execArgv.some(function(s) { return /^--logfile=/.test(s); }))
238     {
239       settings.execArgv = settings.execArgv.concat(['--logfile=v8-%p.log']);
240     }
241     cluster.settings = settings;
242     if (initialized === true)
243       return process.nextTick(setupSettingsNT, settings);
244     initialized = true;
245     schedulingPolicy = cluster.schedulingPolicy;  // Freeze policy.
246     assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
247            'Bad cluster.schedulingPolicy: ' + schedulingPolicy);
248
249     var hasDebugArg = process.execArgv.some(function(argv) {
250       return /^(--debug|--debug-brk)(=\d+)?$/.test(argv);
251     });
252
253     process.nextTick(setupSettingsNT, settings);
254
255     // Send debug signal only if not started in debug mode, this helps a lot
256     // on windows, because RegisterDebugHandler is not called when node starts
257     // with --debug.* arg.
258     if (hasDebugArg)
259       return;
260
261     process.on('internalMessage', function(message) {
262       if (message.cmd !== 'NODE_DEBUG_ENABLED') return;
263       var key;
264       for (key in cluster.workers) {
265         var worker = cluster.workers[key];
266         if (worker.state === 'online' || worker.state === 'listening') {
267           process._debugProcess(worker.process.pid);
268         } else {
269           worker.once('online', function() {
270             process._debugProcess(this.process.pid);
271           });
272         }
273       }
274     });
275   };
276
277   function setupSettingsNT(settings) {
278     cluster.emit('setup', settings);
279   }
280
281   var debugPortOffset = 1;
282
283   function createWorkerProcess(id, env) {
284     var workerEnv = util._extend({}, process.env);
285     var execArgv = cluster.settings.execArgv.slice();
286
287     workerEnv = util._extend(workerEnv, env);
288     workerEnv.NODE_UNIQUE_ID = '' + id;
289
290     for (var i = 0; i < execArgv.length; i++) {
291       var match = execArgv[i].match(/^(--debug|--debug-(brk|port))(=\d+)?$/);
292
293       if (match) {
294         let debugPort = process.debugPort + debugPortOffset;
295         ++debugPortOffset;
296         execArgv[i] = match[1] + '=' + debugPort;
297       }
298     }
299
300     return fork(cluster.settings.exec, cluster.settings.args, {
301       env: workerEnv,
302       silent: cluster.settings.silent,
303       execArgv: execArgv,
304       gid: cluster.settings.gid,
305       uid: cluster.settings.uid
306     });
307   }
308
309   var ids = 0;
310
311   cluster.fork = function(env) {
312     cluster.setupMaster();
313     const id = ++ids;
314     const workerProcess = createWorkerProcess(id, env);
315     const worker = new Worker({
316       id: id,
317       process: workerProcess
318     });
319
320     worker.on('message', this.emit.bind(this, 'message'));
321
322     function removeWorker(worker) {
323       assert(worker);
324
325       delete cluster.workers[worker.id];
326
327       if (Object.keys(cluster.workers).length === 0) {
328         assert(Object.keys(handles).length === 0, 'Resource leak detected.');
329         intercom.emit('disconnect');
330       }
331     }
332
333     function removeHandlesForWorker(worker) {
334       assert(worker);
335
336       for (var key in handles) {
337         var handle = handles[key];
338         if (handle.remove(worker)) delete handles[key];
339       }
340     }
341
342     worker.process.once('exit', function(exitCode, signalCode) {
343       /*
344        * Remove the worker from the workers list only
345        * if it has disconnected, otherwise we might
346        * still want to access it.
347        */
348       if (!worker.isConnected()) removeWorker(worker);
349
350       worker.suicide = !!worker.suicide;
351       worker.state = 'dead';
352       worker.emit('exit', exitCode, signalCode);
353       cluster.emit('exit', worker, exitCode, signalCode);
354     });
355
356     worker.process.once('disconnect', function() {
357       /*
358        * Now is a good time to remove the handles
359        * associated with this worker because it is
360        * not connected to the master anymore.
361        */
362       removeHandlesForWorker(worker);
363
364       /*
365        * Remove the worker from the workers list only
366        * if its process has exited. Otherwise, we might
367        * still want to access it.
368        */
369       if (worker.isDead()) removeWorker(worker);
370
371       worker.suicide = !!worker.suicide;
372       worker.state = 'disconnected';
373       worker.emit('disconnect');
374       cluster.emit('disconnect', worker);
375     });
376
377     worker.process.on('internalMessage', internal(worker, onmessage));
378     process.nextTick(emitForkNT, worker);
379     cluster.workers[worker.id] = worker;
380     return worker;
381   };
382
383   function emitForkNT(worker) {
384     cluster.emit('fork', worker);
385   }
386
387   cluster.disconnect = function(cb) {
388     var workers = Object.keys(cluster.workers);
389     if (workers.length === 0) {
390       process.nextTick(intercom.emit.bind(intercom, 'disconnect'));
391     } else {
392       for (var key in workers) {
393         key = workers[key];
394         if (cluster.workers[key].isConnected())
395           cluster.workers[key].disconnect();
396       }
397     }
398     if (cb) intercom.once('disconnect', cb);
399   };
400
401   Worker.prototype.disconnect = function() {
402     this.suicide = true;
403     send(this, { act: 'disconnect' });
404   };
405
406   Worker.prototype.destroy = function(signo) {
407     signo = signo || 'SIGTERM';
408     var proc = this.process;
409     if (this.isConnected()) {
410       this.once('disconnect', proc.kill.bind(proc, signo));
411       this.disconnect();
412       return;
413     }
414     proc.kill(signo);
415   };
416
417   function onmessage(message, handle) {
418     var worker = this;
419     if (message.act === 'online')
420       online(worker);
421     else if (message.act === 'queryServer')
422       queryServer(worker, message);
423     else if (message.act === 'listening')
424       listening(worker, message);
425     else if (message.act === 'suicide')
426       worker.suicide = true;
427     else if (message.act === 'close')
428       close(worker, message);
429   }
430
431   function online(worker) {
432     worker.state = 'online';
433     worker.emit('online');
434     cluster.emit('online', worker);
435   }
436
437   function queryServer(worker, message) {
438     var args = [message.address,
439                 message.port,
440                 message.addressType,
441                 message.fd,
442                 message.index];
443     var key = args.join(':');
444     var handle = handles[key];
445     if (handle === undefined) {
446       var constructor = RoundRobinHandle;
447       // UDP is exempt from round-robin connection balancing for what should
448       // be obvious reasons: it's connectionless. There is nothing to send to
449       // the workers except raw datagrams and that's pointless.
450       if (schedulingPolicy !== SCHED_RR ||
451           message.addressType === 'udp4' ||
452           message.addressType === 'udp6') {
453         constructor = SharedHandle;
454       }
455       handles[key] = handle = new constructor(key,
456                                               message.address,
457                                               message.port,
458                                               message.addressType,
459                                               message.backlog,
460                                               message.fd,
461                                               message.flags);
462     }
463     if (!handle.data) handle.data = message.data;
464
465     // Set custom server data
466     handle.add(worker, function(errno, reply, handle) {
467       reply = util._extend({
468         errno: errno,
469         key: key,
470         ack: message.seq,
471         data: handles[key].data
472       }, reply);
473       if (errno) delete handles[key];  // Gives other workers a chance to retry.
474       send(worker, reply, handle);
475     });
476   }
477
478   function listening(worker, message) {
479     var info = {
480       addressType: message.addressType,
481       address: message.address,
482       port: message.port,
483       fd: message.fd
484     };
485     worker.state = 'listening';
486     worker.emit('listening', info);
487     cluster.emit('listening', worker, info);
488   }
489
490   // Server in worker is closing, remove from list.
491   function close(worker, message) {
492     var key = message.key;
493     var handle = handles[key];
494     if (handle.remove(worker)) delete handles[key];
495   }
496
497   function send(worker, message, handle, cb) {
498     sendHelper(worker.process, message, handle, cb);
499   }
500 }
501
502
503 function workerInit() {
504   var handles = {};
505   var indexes = {};
506
507   // Called from src/node.js
508   cluster._setupWorker = function() {
509     var worker = new Worker({
510       id: +process.env.NODE_UNIQUE_ID | 0,
511       process: process,
512       state: 'online'
513     });
514     cluster.worker = worker;
515     process.once('disconnect', function() {
516       worker.emit('disconnect');
517       if (!worker.suicide) {
518         // Unexpected disconnect, master exited, or some such nastiness, so
519         // worker exits immediately.
520         process.exit(0);
521       }
522     });
523     process.on('internalMessage', internal(worker, onmessage));
524     send({ act: 'online' });
525     function onmessage(message, handle) {
526       if (message.act === 'newconn')
527         onconnection(message, handle);
528       else if (message.act === 'disconnect')
529         worker.disconnect();
530     }
531   };
532
533   // obj is a net#Server or a dgram#Socket object.
534   cluster._getServer = function(obj, options, cb) {
535     const key = [ options.address,
536                 options.port,
537                 options.addressType,
538                 options.fd ].join(':');
539     if (indexes[key] === undefined)
540       indexes[key] = 0;
541     else
542       indexes[key]++;
543
544     const message = util._extend({
545       act: 'queryServer',
546       index: indexes[key],
547       data: null
548     }, options);
549
550     // Set custom data on handle (i.e. tls tickets key)
551     if (obj._getServerData) message.data = obj._getServerData();
552     send(message, function(reply, handle) {
553       if (obj._setServerData) obj._setServerData(reply.data);
554
555       if (handle)
556         shared(reply, handle, cb);  // Shared listen socket.
557       else
558         rr(reply, cb);              // Round-robin.
559     });
560     obj.once('listening', function() {
561       cluster.worker.state = 'listening';
562       const address = obj.address();
563       message.act = 'listening';
564       message.port = address && address.port || options.port;
565       send(message);
566     });
567   };
568
569   // Shared listen socket.
570   function shared(message, handle, cb) {
571     var key = message.key;
572     // Monkey-patch the close() method so we can keep track of when it's
573     // closed. Avoids resource leaks when the handle is short-lived.
574     var close = handle.close;
575     handle.close = function() {
576       send({ act: 'close', key: key });
577       delete handles[key];
578       return close.apply(this, arguments);
579     };
580     assert(handles[key] === undefined);
581     handles[key] = handle;
582     cb(message.errno, handle);
583   }
584
585   // Round-robin. Master distributes handles across workers.
586   function rr(message, cb) {
587     if (message.errno)
588       return cb(message.errno, null);
589
590     var key = message.key;
591     function listen(backlog) {
592       // TODO(bnoordhuis) Send a message to the master that tells it to
593       // update the backlog size. The actual backlog should probably be
594       // the largest requested size by any worker.
595       return 0;
596     }
597
598     function close() {
599       // lib/net.js treats server._handle.close() as effectively synchronous.
600       // That means there is a time window between the call to close() and
601       // the ack by the master process in which we can still receive handles.
602       // onconnection() below handles that by sending those handles back to
603       // the master.
604       if (key === undefined) return;
605       send({ act: 'close', key: key });
606       delete handles[key];
607       key = undefined;
608     }
609
610     function getsockname(out) {
611       if (key) util._extend(out, message.sockname);
612       return 0;
613     }
614
615     // XXX(bnoordhuis) Probably no point in implementing ref() and unref()
616     // because the control channel is going to keep the worker alive anyway.
617     function ref() {
618     }
619
620     function unref() {
621     }
622
623     // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
624     // with it. Fools net.Server into thinking that it's backed by a real
625     // handle.
626     var handle = {
627       close: close,
628       listen: listen,
629       ref: ref,
630       unref: unref,
631     };
632     if (message.sockname) {
633       handle.getsockname = getsockname;  // TCP handles only.
634     }
635     assert(handles[key] === undefined);
636     handles[key] = handle;
637     cb(0, handle);
638   }
639
640   // Round-robin connection.
641   function onconnection(message, handle) {
642     var key = message.key;
643     var server = handles[key];
644     var accepted = server !== undefined;
645     send({ ack: message.seq, accepted: accepted });
646     if (accepted) server.onconnection(0, handle);
647   }
648
649   Worker.prototype.disconnect = function() {
650     this.suicide = true;
651     var waitingHandles = 0;
652
653     function checkRemainingHandles() {
654       waitingHandles--;
655       if (waitingHandles === 0) {
656         process.disconnect();
657       }
658     }
659
660     for (var key in handles) {
661       var handle = handles[key];
662       delete handles[key];
663       waitingHandles++;
664       handle.owner.close(checkRemainingHandles);
665     }
666
667     if (waitingHandles === 0) {
668       process.disconnect();
669     }
670
671   };
672
673   Worker.prototype.destroy = function() {
674     this.suicide = true;
675     if (!this.isConnected()) process.exit(0);
676     var exit = process.exit.bind(null, 0);
677     send({ act: 'suicide' }, exit);
678     process.once('disconnect', exit);
679     process.disconnect();
680   };
681
682   function send(message, cb) {
683     sendHelper(process, message, null, cb);
684   }
685 }
686
687
688 var seq = 0;
689 var callbacks = {};
690 function sendHelper(proc, message, handle, cb) {
691   // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
692   message = util._extend({ cmd: 'NODE_CLUSTER' }, message);
693   if (cb) callbacks[seq] = cb;
694   message.seq = seq;
695   seq += 1;
696   proc.send(message, handle);
697 }
698
699
700 // Returns an internalMessage listener that hands off normal messages
701 // to the callback but intercepts and redirects ACK messages.
702 function internal(worker, cb) {
703   return function(message, handle) {
704     if (message.cmd !== 'NODE_CLUSTER') return;
705     var fn = cb;
706     if (message.ack !== undefined) {
707       fn = callbacks[message.ack];
708       delete callbacks[message.ack];
709     }
710     fn.apply(worker, arguments);
711   };
712 }