new cluster api
authorRyan Dahl <ry@tinyclouds.org>
Fri, 4 Nov 2011 22:11:19 +0000 (15:11 -0700)
committerRyan Dahl <ry@tinyclouds.org>
Fri, 4 Nov 2011 22:12:11 +0000 (15:12 -0700)
doc/api/cluster.markdown
lib/cluster.js
lib/net.js
src/node.js

index efcb02f..7df84b1 100644 (file)
@@ -9,10 +9,17 @@ which share server ports.
 
     var cluster = require('cluster');
     var http = require('http');
+    var numCPUs = require('os').cpus().length;
 
     if (cluster.isMaster) {
-      // Start the master process, fork workers.
-      cluster.startMaster({ workers: 2 });
+      // Fork workers.
+      for (var i = 0; i < numCPUs; i++) {
+        cluster.fork();
+      }
+
+      cluster.on('death', function(worker) {
+        console.log('worker ' + worker.pid + ' died');
+      });
     } else {
       // Worker processes have a http server.
       http.Server(function(req, res) {
@@ -27,37 +34,38 @@ Running node will now share port 8000 between the workers:
     Worker 2438 online
     Worker 2437 online
 
-### exports.startMaster([options])
+### cluster.fork()
 
-  Spawns the initial worker processes, one per CPU by default.
+Spawn a new worker process. This can only be called from the master process.
 
-  The following options are supported:
+### cluster.isMaster
+### cluster.isWorker
 
-  - `workerFilename`: script to execute in the worker process, defaults to
-    `process.argv[1]`
-  - `args`: worker program arguments, defaulting to `process.argv.slice(2)`
-  - `workers`: the number of workers, defaulting to `os.cpus().length`
+Boolean flags to determine if the current process is a master or a worker
+process in a cluster. A process `isMaster` if `process.env.NODE_WORKER_ID`
+is undefined.
 
-### exports.spawnWorker([options])
+### cluster.eachWorker(cb)
 
-   Spawn a new worker process. This is called within `cluster.startMaster()`,
-   however it is useful to implement worker resuscitation as described below
-   in the "Common patterns" section.
+Synchronously iterates over all of the workers.
 
-   The `options` available are identical to `cluster.startMaster()`.
+    cluster.eachWorker(function(worker) {
+      console.log("worker pid=" + worker.pid);
+    });
 
-## Common patterns
+### cluster.workerCount()
 
-## Worker resuscitation
+Returns the number of workers.
 
-The following is an example of how you may implement worker resuscitation,
-spawning a new worker process when another exits.
+### Event: 'death'
 
-    if (cluster.isMaster) {
-      cluster.startMaster();
-      process.on('SIGCHLD', function(){
-        console.log('worker killed');
-        cluster.spawnWorker();
-      });
-    }
+When any of the workers die the cluster module will emit the 'death' event.
+This can be used to restart the worker by calling `fork()` again.
 
+    cluster.on('death', function(worker) {
+      console.log('worker ' + worker.pid + ' died. restart...');
+      cluster.fork();
+    });
+  
+Different techniques can be used to restart the worker depending on the
+application.
index 5cb1c2b..9e8a394 100644 (file)
@@ -22,7 +22,9 @@
 var assert = require('assert');
 var fork = require('child_process').fork;
 var net = require('net');
-var amMaster; // Used for asserts
+var EventEmitter = require('events').EventEmitter;
+
+var cluster = module.exports = new EventEmitter();
 
 
 var debug;
@@ -38,17 +40,20 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
 
 
 // Used in the master:
+var masterStarted = false;
 var ids = 0;
 var workers = [];
 var servers = {};
+var workerFilename;
+var workerArgs;
 
 // Used in the worker:
 var workerId = 0;
 var queryIds = 0;
 var queryCallbacks = {};
 
-exports.isWorker = 'NODE_WORKER_ID' in process.env;
-exports.isMaster = ! exports.isWorker;
+cluster.isWorker = 'NODE_WORKER_ID' in process.env;
+cluster.isMaster = ! cluster.isWorker;
 
 // Call this from the master process. It will start child workers.
 //
@@ -62,38 +67,23 @@ exports.isMaster = ! exports.isWorker;
 //
 // options.workers
 // The number of workers to start. Defaults to os.cpus().length.
-exports.startMaster = function(options) {
-  amMaster = true;
-
-  if (!options) {
-    options = {};
-  }
-
-  if (!options.workerFilename) {
-    options.workerFilename = process.argv[1];
-  }
-
-  if (!options.args) {
-    options.args = process.argv.slice(2);
-  }
+function startMaster() {
+  // This can only be called from the master.
+  assert(cluster.isMaster);
 
-  if (!options.workers) {
-    options.workers = require('os').cpus().length;
-  }
+  if (masterStarted) return;
+  masterStarted = true;
 
-  for (var i = 0; i < options.workers; i++) {
-    forkWorker(options.workerFilename, options.args);
-  }
+  workerFilename = process.argv[1];
+  workerArgs = process.argv.slice(2);
 
   process.on('uncaughtException', function(e) {
     // Quickly try to kill all the workers.
     // TODO: be session leader - will cause auto SIGHUP to the children.
-    for (var id in workers) {
-      if (workers[id]) {
-        debug("kill worker " + id);
-        workers[id].kill();
-      }
-    }
+    cluster.eachWorker(function(worker) {
+      debug("kill worker " + worker.pid);
+      worker.kill();
+    })
 
     console.error("Exception in cluster master process: " +
         e.message + '\n' + e.stack);
@@ -104,7 +94,8 @@ exports.startMaster = function(options) {
 
 
 function handleWorkerMessage(worker, message) {
-  assert.ok(amMaster);
+  // This can only be called from the master.
+  assert(cluster.isMaster);
 
   debug("recv " + JSON.stringify(message));
 
@@ -137,7 +128,34 @@ function handleWorkerMessage(worker, message) {
 }
 
 
-function forkWorker(workerFilename, args) {
+cluster.eachWorker = function(cb) {
+  // This can only be called from the master.
+  assert(cluster.isMaster);
+
+  for (var id in workers) {
+    if (workers[id]) {
+      cb(workers[id]);
+    }
+  }
+};
+
+
+cluster.workerCount = function() {
+  var c = 0;
+  cluster.eachWorker(function() {
+    c++;
+  });
+  return c;
+};
+
+
+cluster.fork = function() {
+  // This can only be called from the master.
+  assert(cluster.isMaster);
+
+  // Lazily start the master process stuff.
+  startMaster();
+
   var id = ++ids;
   var envCopy = {};
 
@@ -147,9 +165,7 @@ function forkWorker(workerFilename, args) {
 
   envCopy['NODE_WORKER_ID'] = id;
 
-  var worker = fork(workerFilename, args, {
-    env: envCopy
-  });
+  var worker = fork(workerFilename, workerArgs, { env: envCopy });
 
   worker.on('message', function(message) {
     handleWorkerMessage(worker, message);
@@ -158,15 +174,16 @@ function forkWorker(workerFilename, args) {
   worker.on('exit', function() {
     debug('worker id=' + id + ' died');
     delete workers[id];
+    cluster.emit('death', worker);
   });
 
   return worker;
 }
 
 
-exports.startWorker = function() {
-  assert.ok(!amMaster);
-  amMaster = false;
+// Internal function. Called from src/node.js when worker process starts.
+cluster._startWorker = function() {
+  assert(cluster.isWorker);
   workerId = parseInt(process.env.NODE_WORKER_ID);
 
   queryMaster({ cmd: 'online' });
@@ -186,7 +203,7 @@ exports.startWorker = function() {
 
 
 function queryMaster(msg, cb) {
-  assert.ok(!amMaster);
+  assert(cluster.isWorker);
 
   debug('send ' + JSON.stringify(msg));
 
@@ -194,7 +211,7 @@ function queryMaster(msg, cb) {
   msg._queryId = (++queryIds);
   msg._workerId = workerId;
 
-  // Store callback for later. Callback called in startWorker.
+  // Store callback for later. Callback called in _startWorker.
   if (cb) {
     queryCallbacks[msg._queryId] = cb;
   }
@@ -204,8 +221,10 @@ function queryMaster(msg, cb) {
 }
 
 
-exports.getServer = function(address, port, addressType, cb) {
-  assert.ok(!amMaster);
+// Internal function. Called by lib/net.js when attempting to bind a
+// server.
+cluster._getServer = function(address, port, addressType, cb) {
+  assert(cluster.isWorker);
 
   queryMaster({
     cmd: "queryServer",
index 92437f4..c1d09ee 100644 (file)
@@ -714,7 +714,7 @@ Server.prototype._listen2 = function(address, port, addressType) {
 
 function listen(self, address, port, addressType) {
   if (process.env.NODE_WORKER_ID) {
-    require('cluster').getServer(address, port, addressType, function(handle) {
+    require('cluster')._getServer(address, port, addressType, function(handle) {
       self._handle = handle;
       self._listen2(address, port, addressType);
     });
index 2e2fc1b..a6788b3 100644 (file)
@@ -88,7 +88,7 @@
       // channel.
       if (process.env.NODE_WORKER_ID) {
         var cluster = NativeModule.require('cluster');
-        cluster.startWorker();
+        cluster._startWorker();
       }
 
       var Module = NativeModule.require('module');