cluster improvements: Worker class and isolate internal messages
authorAndreas Madsen <amwebdk@gmail.com>
Tue, 20 Dec 2011 09:42:48 +0000 (10:42 +0100)
committerRyan Dahl <ry@tinyclouds.org>
Thu, 5 Jan 2012 02:30:19 +0000 (18:30 -0800)
Fixes #2388

doc/api/child_processes.markdown
doc/api/cluster.markdown
lib/child_process.js
lib/cluster.js
lib/net.js
src/node.js
test/simple/test-child-process-internal.js [new file with mode: 0644]
test/simple/test-cluster-basic.js [new file with mode: 0644]
test/simple/test-cluster-fork-env.js
test/simple/test-cluster-kill-workers.js
test/simple/test-cluster-message.js [new file with mode: 0644]

index 8a1f171..e34a4e9 100644 (file)
@@ -229,6 +229,12 @@ And then the child script, `'sub.js'` might look like this:
 In the child the `process` object will have a `send()` method, and `process`
 will emit objects each time it receives a message on its channel.
 
+There is a special case when seinding a `{cmd: 'NODE_foo'}` message. All messages
+containging a `NODE_` prefix in its `cmd` property will not be emitted in
+the `message` event, since this are internal messages used by node core.
+Messages contain the prefix are emitted in the `internalMessage` event, you
+should by all means avoid using this feature, it may change without warranty.
+
 By default the spawned Node process will have the stdout, stderr associated
 with the parent's. To change this behavior set the `silent` property in the
 `options` object to `true`.
index b63ad26..5f77f54 100644 (file)
@@ -21,8 +21,9 @@ all share server ports.
         console.log('worker ' + worker.pid + ' died');
       });
     } else {
-      // Worker processes have a http server.
-      http.Server(function(req, res) {
+      // Workers can share any TCP connection
+      // In this case its a HTTP server
+      http.createServer(function(req, res) {
         res.writeHead(200);
         res.end("hello world\n");
       }).listen(8000);
@@ -34,66 +35,221 @@ Running node will now share port 8000 between the workers:
     Worker 2438 online
     Worker 2437 online
 
-The difference between `cluster.fork()` and `child_process.fork()` is simply
-that cluster allows TCP servers to be shared between workers. `cluster.fork`
-is implemented on top of `child_process.fork`. The message passing API that
-is available with `child_process.fork` is available with `cluster` as well.
-As an example, here is a cluster which keeps count of the number of requests
-in the master process via message passing:
+
+### cluster.isMaster
+
+This boolean flag is true if the process is a master. This is determined
+by the `process.env.NODE_UNIQUE_ID`. If `process.env.NODE_UNIQUE_ID` is
+undefined `isMaster` is `true`.
+
+### cluster.isWorker
+
+This boolean flag is true if the process is a worker forked from a master.
+If the `process.env.NODE_UNIQUE_ID` is set to a value different efined
+`isWorker` is `true`.
+
+### Event: 'fork'
+
+When a new worker is forked the cluster module will emit a 'fork' event.
+This can be used to log worker activity, and create you own timeout.
+
+    var timeouts = [];
+    var errorMsg = function () {
+      console.error("Something must be wrong with the connection ...");
+    });
+
+    cluster.on('fork', function (worker) {
+      timeouts[worker.uniqueID] = setTimeout(errorMsg, 2000);
+    });
+    cluster.on('listening', function (worker) {
+      clearTimeout(timeouts[worker.uniqueID]);
+    });
+    cluster.on('death', function (worker) {
+      clearTimeout(timeouts[worker.uniqueID]);
+      errorMsg();
+    });
+
+### Event: 'online'
+
+After forking a new worker, the worker should respond with a online message.
+When the master receives a online message it will emit such event.
+The difference between 'fork' and 'online' is that fork is emitted when the
+master tries to fork a worker, and 'online' is emitted when the worker is being
+executed.
+
+    cluster.on('online', function (worker) {
+      console.log("Yay, the worker responded after it was forked");
+    });
+
+### Event: 'listening'
+
+When calling `listen()` from a worker, a 'listening' event is automatically assigned
+to the server instance. When the server is listening a message is send to the master
+where the 'listening' event is emitted.
+
+    cluster.on('listening', function (worker) {
+      console.log("We are now connected");
+    });
+
+### Event: 'death'
+
+When any of the workers die the cluster module will emit the 'death' event.
+This can be used to restart the worker by calling `fork()` again.
+
+    cluster.on('death', function(worker) {
+      console.log('worker ' + worker.pid + ' died. restart...');
+      cluster.fork();
+    });
+
+### cluster.fork([env])
+
+Spawn a new worker process. This can only be called from the master process.
+The function takes an optional `env` object. The properties in this object
+will be added to the process environment in the worker.
+
+### cluster.workers
+
+In the cluster all living worker objects are stored in this object by there
+`uniqueID` as the key. This makes it easy to loop thouge all liveing workers.
+
+    // Go througe all workers
+    function eachWorker(callback) {
+      for (var uniqueID in cluster.workers) {
+        callback(cluster.workers[uniqueID]);
+      }
+    }
+    eachWorker(function (worker) {
+      worker.send('big announcement to all workers');
+    });
+
+Should you wich to reference a worker over a communication channel this unsing
+there `uniqueID` this is also the easies way to find the worker.
+
+    socket.on('data', function (uniqueID) {
+      var worker = cluster.workers[uniqueID];
+    });
+
+## Worker
+
+This object contains all public information and method about a worker.
+In the master it can be obtainedusing `cluster.workers`. In a worker
+it can be obtained ained using `cluster.worker`.
+
+### Worker.uniqueID
+
+Each new worker is given its own unique id, this id i stored in the `uniqueID`.
+
+### Worker.process
+
+All workers are created using `child_process.fork()`, the returned object from this
+function is stored in process.
+
+### Worker.send(message, [sendHandle])
+
+This function is equal to the send methods provided by `child_process.fork()`.
+In the master you should use this function to send a message to a specific worker.
+However in a worker you can also use `process.send(message)`, since this is the same
+function.
+
+This example will echo back all messages from the master:
+
+    if (cluster.isMaster) {
+      var worker = cluster.fork();
+      worker.send('hi there');
+
+    } else if (cluster.isWorker) {
+      process.on('message', function (msg) {
+        process.send(msg);
+      });
+    }
+
+### Worker.destroy()
+
+This function will kill the worker, and inform the master to not spawn a new worker.
+To know the difference between suicide and accidently death a suicide boolean is set to true.
+
+    cluster.on('death', function (worker) {
+      if (worker.suicide === true) {
+        console.log('Oh, it was just suicide' – no need to worry').
+      }
+    });
+
+    // destroy worker
+    worker.destroy();
+
+### Worker.suicide
+
+This property is a boolean. It is set when a worker dies, until then it is `undefined`.
+It is true if the worker was killed using the `.destroy()` method, and false otherwise.
+
+### Event: message
+
+This event is the same as the one provided by `child_process.fork()`.
+In the master you should use this event, however in a worker you can also use
+`process.on('message')`
+
+As an example, here is a cluster that keeps count of the number of requests
+in the master process using the message system:
 
     var cluster = require('cluster');
     var http = require('http');
-    var numReqs = 0;
 
     if (cluster.isMaster) {
-      // Fork workers.
-      for (var i = 0; i < 2; i++) {
-        var worker = cluster.fork();
-
-        worker.on('message', function(msg) {
-          if (msg.cmd && msg.cmd == 'notifyRequest') {
-            numReqs++;
-          }
-        });
-      }
 
+      // Keep track of http requests
+      var numReqs = 0;
       setInterval(function() {
         console.log("numReqs =", numReqs);
       }, 1000);
+
+      // Count requestes
+      var messageHandler = function (msg) {
+        if (msg.cmd && msg.cmd == 'notifyRequest') {
+          numReqs += 1;
+        }
+      };
+
+      // Start workers and listen for messages containing notifyRequest
+      cluster.autoFork();
+      Object.keys(cluster.workers).forEach(function (uniqueID) {
+        cluster.workers[uniqueID].on('message', messageHandler);
+      });
+
     } else {
+
       // Worker processes have a http server.
       http.Server(function(req, res) {
         res.writeHead(200);
         res.end("hello world\n");
-        // Send message to master process
+
+        // notify master about the request
         process.send({ cmd: 'notifyRequest' });
       }).listen(8000);
     }
 
+### Event: online
 
+Same as the `cluster.on('online')` event, but emits only when the state change
+on the specified worker.
 
-### cluster.fork([env])
+    cluster.fork().on('online', function (worker) {
+      // Worker is online
+    };
 
-Spawn a new worker process. This can only be called from the master process.
-The function takes an optional `env` object. The propertyies in this object
-will be added to the process environment in the worker.
+### Event: listening
 
-### cluster.isMaster
-### cluster.isWorker
+Same as the `cluster.on('listening')` event, but emits only when the state change
+on the specified worker.
 
-Boolean flags to determine if the current process is a master or a worker
-process in a cluster. A process `isMaster` if `process.env.NODE_WORKER_ID`
-is undefined.
+    cluster.fork().on('listening', function (worker) {
+      // Worker is listening
+    };
 
-### Event: 'death'
+### Event: death
 
-When any of the workers die the cluster module will emit the 'death' event.
-This can be used to restart the worker by calling `fork()` again.
-
-    cluster.on('death', function(worker) {
-      console.log('worker ' + worker.pid + ' died. restart...');
-      cluster.fork();
-    });
+Same as the `cluster.on('death')` event, but emits only when the state change
+on the specified worker.
 
-Different techniques can be used to restart the worker depending on the
-application.
+    cluster.fork().on('death', function (worker) {
+      // Worker has died
+    };
index b3e6383..2e39166 100644 (file)
@@ -95,11 +95,25 @@ function setupChannel(target, channel) {
       jsonBuffer += pool.toString('ascii', offset, offset + length);
 
       var i, start = 0;
+
+      //Linebreak is used as a message end sign
       while ((i = jsonBuffer.indexOf('\n', start)) >= 0) {
         var json = jsonBuffer.slice(start, i);
         var message = JSON.parse(json);
 
-        target.emit('message', message, recvHandle);
+        //Filter out internal messages
+        //if cmd property begin with "_NODE"
+        if (message !== null &&
+            typeof message === 'object' &&
+            typeof message.cmd === 'string' &&
+            message.cmd.indexOf('NODE_') === 0) {
+          target.emit('inernalMessage', message, recvHandle);
+        }
+        //Non-internal message
+        else {
+          target.emit('message', message, recvHandle);
+        }
+
         start = i + 1;
       }
       jsonBuffer = jsonBuffer.slice(start);
index 5892481..90da833 100644 (file)
@@ -23,22 +23,24 @@ var assert = require('assert');
 var fork = require('child_process').fork;
 var net = require('net');
 var EventEmitter = require('events').EventEmitter;
+var util = require('util');
 
 function isObject(o) {
   return (typeof o === 'object' && o !== null);
 }
 
 function extendObject(origin, add) {
+  // Don't do anything if add isn't an object
+  if (!add) return origin;
+
   var keys = Object.keys(add),
       i = keys.length;
-  while(i--) {
+  while (i--) {
     origin[keys[i]] = add[keys[i]];
   }
   return origin;
 }
 
-var cluster = module.exports = new EventEmitter();
-
 var debug;
 if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
   debug = function(x) {
@@ -50,23 +52,42 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
   debug = function() { };
 }
 
+// cluster object:
+function cluster() {}
+util.inherits(cluster, EventEmitter);
+var cluster = module.exports = new cluster();
 
 // Used in the master:
 var masterStarted = false;
 var ids = 0;
-var workers = [];
-var servers = {};
+var serverHandlers = {};
 var workerFilename;
 var workerArgs;
 
 // Used in the worker:
-var workerId = 0;
+var serverLisenters = {};
 var queryIds = 0;
 var queryCallbacks = {};
 
-cluster.isWorker = 'NODE_WORKER_ID' in process.env;
+// Define isWorker and isMaster
+cluster.isWorker = 'NODE_UNIQUE_ID' in process.env;
 cluster.isMaster = ! cluster.isWorker;
 
+// The worker object is only used in a worker
+cluster.worker = cluster.isWorker ? {} : null;
+// The workers array is oly used in the naster
+cluster.workers = cluster.isMaster ? {} : null;
+
+// Simple function there call a function on each worker
+function eachWorker(cb) {
+  // Go througe all workers
+  for (var id in cluster.workers) {
+    if (cluster.workers.hasOwnProperty(id)) {
+      cb(cluster.workers[id]);
+    }
+  }
+}
+
 // Call this from the master process. It will start child workers.
 //
 // options.workerFilename
@@ -90,155 +111,375 @@ function startMaster() {
   workerArgs = process.argv.slice(2);
 
   process.on('uncaughtException', function(e) {
-    // Quickly try to kill all the workers.
-    // TODO: be session leader - will cause auto SIGHUP to the children.
-    eachWorker(function(worker) {
-      debug('kill worker ' + worker.pid);
-      worker.kill();
-    });
-
     console.error('Exception in cluster master process: ' +
         e.message + '\n' + e.stack);
+
+    quickDestroyCluster();
     process.exit(1);
   });
 }
 
+// Check if a message is internal only
+var INTERNAL_PREFIX = 'NODE_CLUTER_';
+function isInternalMessage(message) {
+  return (isObject(message) &&
+          typeof message.cmd === 'string' &&
+          message.cmd.indexOf(INTERNAL_PREFIX) === 0);
+}
 
-function handleWorkerMessage(worker, message) {
-  // This can only be called from the master.
-  assert(cluster.isMaster);
+// Modyfi message object to be internal
+function internalMessage(inMessage) {
+  var outMessage = extendObject({}, inMessage);
+
+  // Add internal prefix to cmd
+  outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || '');
 
-  debug('recv ' + JSON.stringify(message));
-
-  switch (message.cmd) {
-    case 'online':
-      debug('Worker ' + worker.pid + ' online');
-      worker.online = true;
-      break;
-
-    case 'queryServer':
-      var key = message.address + ':' +
-                message.port + ':' +
-                message.addressType;
-      var response = { _queryId: message._queryId };
-
-      if (!(key in servers)) {
-        // Create a new server.
-        debug('create new server ' + key);
-        servers[key] = net._createServerHandle(message.address,
-                                               message.port,
-                                               message.addressType);
-      }
-      worker.send(response, servers[key]);
-      break;
-
-    default:
-      // Ignore.
-      break;
+  return outMessage;
+}
+
+// Handle callback messges
+function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) {
+
+  // The message there will be send
+  var message = internalMessage(outMessage);
+
+  // callback id - will be undefined if not set
+  message._queryEcho = inMessage._requestEcho;
+
+  // Call callback if a query echo is received
+  if (inMessage.hasOwnProperty('_queryEcho')) {
+    queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle);
+    delete queryCallbacks[inMessage._queryEcho];
+  }
+
+  // Send if outWrap do contain something useful
+  if (!(outMessage === undefined && message._queryEcho === undefined)) {
+    sendInternalMessage(worker, message, outHandle);
   }
 }
 
+// Handle messages from both master and workers
+var messageHandingObject = {};
+function handleMessage(inMessage, inHandle, worker) {
 
-function eachWorker(cb) {
-  // This can only be called from the master.
-  assert(cluster.isMaster);
+  //Remove internal prefix
+  var message = extendObject({}, inMessage);
+  message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length);
 
-  for (var id in workers) {
-    if (workers[id]) {
-      cb(workers[id]);
-    }
+  var respondUsed = false;
+  var respond = function(outMessage, outHandler) {
+    respondUsed = true;
+    handleResponse(outMessage, outHandler, inMessage, inHandle, worker);
+  };
+
+  // Run handler if it exist
+  if (messageHandingObject[message.cmd]) {
+    messageHandingObject[message.cmd](message, worker, respond);
+  }
+
+  // Send respond if it wasn't done
+  if (respondUsed === false) {
+    respond();
   }
 }
 
+// Messages to the master will be handled using this methods
+if (cluster.isMaster) {
 
-cluster.fork = function(env) {
-  // This can only be called from the master.
-  assert(cluster.isMaster);
+  // Handle online messages from workers
+  messageHandingObject.online = function(message, worker) {
+    worker.state = 'online';
+    debug('Worker ' + worker.process.pid + ' online');
+    worker.emit('online', worker);
+    cluster.emit('online', worker);
+  };
 
-  // Lazily start the master process stuff.
-  startMaster();
+  // Handle queryServer messages form workers
+  messageHandingObject.queryServer = function(message, worker, send) {
+
+    // This sequence of infomation is unique to the connection but not
+    // to the worker
+    var args = [message.address, message.port, message.addressType];
+    var key = args.join(':');
+    var handler;
+
+    if (serverHandlers.hasOwnProperty(key)) {
+      handler = serverHandlers[key];
+    } else {
+      handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
+    }
+
+    // echo callback with the fd handler associated with it
+    send({}, handler);
+  };
+
+  // Handle listening messages from workers
+  messageHandingObject.listening = function(message, worker) {
+
+    worker.state = 'listening';
+
+    // Emit listining, now that we know the worker is listning
+    worker.emit('listening', worker, {
+      address: message.address,
+      port: message.port,
+      addressType: message.addressType
+    });
+    cluster.emit('listening', worker, {
+      address: message.address,
+      port: message.port,
+      addressType: message.addressType
+    });
+  };
+
+  // Handle suicide messages from workers
+  messageHandingObject.suicide = function(message, worker) {
+    worker.suicide = true;
+  };
+
+}
+
+// Messages to a worker will be handled using this methods
+else if (cluster.isWorker) {
+
+  // TODO: the disconnect step will use this
+}
+
+function toDecInt(value) {
+  value = parseInt(value, 10);
+  return isNaN(value) ? null : value;
+}
+
+// Create a worker object, there works both for master and worker
+function Worker(customEnv) {
+  if (!(this instanceof Worker)) return new Worker();
+
+  var self = this;
+  var env = process.env;
+
+  // Assign uniqueID, default null
+  this.uniqueID = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID);
+
+  // Assign state
+  this.state = 'none';
 
-  var id = ++ids;
+  // Create or get process
+  if (cluster.isMaster) {
 
-  //Create env object
-  var envCopy = extendObject({}, process.env);
-  envCopy['NODE_WORKER_ID'] = id;
-  if (isObject(env)) {
-    envCopy = extendObject(envCopy, env);
+    // Create env object
+    // first: copy and add uniqueID
+    var envCopy = extendObject({}, env);
+    envCopy['NODE_UNIQUE_ID'] = this.uniqueID;
+    // second: extend envCopy with the env argument
+    if (isObject(customEnv)) {
+      envCopy = extendObject(envCopy, customEnv);
+    }
+
+    // fork worker
+    this.process = fork(workerFilename, workerArgs, {
+      'env': envCopy
+    });
+
+  } else {
+    this.process = process;
   }
 
-  //fork worker
-  var worker = fork(workerFilename, workerArgs, {
-    'env': envCopy
-  });
+  if (cluster.isMaster) {
+    // Save worker in the cluster.workers array
+    cluster.workers[this.uniqueID] = this;
 
-  workers[id] = worker;
+    // Emit a fork event, on next tick
+    // There is no worker.fork event since this has no real purpose
+    process.nextTick(function() {
+      cluster.emit('fork', self);
+    });
+  }
 
-  worker.on('message', function(message) {
-    handleWorkerMessage(worker, message);
-  });
+  // Internal message: handle message
+  this.process.on('inernalMessage', function(message, handle) {
+    debug('recived: ', message);
 
-  worker.on('exit', function() {
-    debug('worker id=' + id + ' died');
-    delete workers[id];
-    cluster.emit('death', worker);
+    // relay to handleMessage
+    handleMessage(message, handle, self);
+    return;
   });
 
-  return worker;
-};
+  // Non-internal message: relay to Worker object
+  this.process.on('message', function(message, handle) {
+    self.emit('message', message, handle);
+  });
 
+  // Handle exit
+  self.process.on('exit', function() {
+    debug('worker id=' + self.uniqueID + ' died');
 
-// Internal function. Called from src/node.js when worker process starts.
-cluster._startWorker = function() {
-  assert(cluster.isWorker);
-  workerId = parseInt(process.env.NODE_WORKER_ID, 10);
-
-  queryMaster({ cmd: 'online' });
-
-  // Make callbacks from queryMaster()
-  process.on('message', function(msg, handle) {
-    debug('recv ' + JSON.stringify(msg));
-    if (msg._queryId && msg._queryId in queryCallbacks) {
-      var cb = queryCallbacks[msg._queryId];
-      if (typeof cb == 'function') {
-        cb(msg, handle);
-      }
-      delete queryCallbacks[msg._queryId];
-    }
+    // Prepare worker to die and emit events
+    prepareDeath(self, 'dead', 'death');
   });
+
+}
+util.inherits(Worker, EventEmitter);
+cluster.Worker = Worker;
+
+function prepareDeath(worker, state, eventName) {
+
+  // set state to disconnect
+  worker.state = state;
+
+  // Make suicide a boolean
+  worker.suicide = !!worker.suicide;
+
+  // Remove from workers in the master
+  if (cluster.isMaster) {
+    delete cluster.workers[worker.uniqueID];
+  }
+
+  // Emit events
+  worker.emit(eventName, worker);
+  cluster.emit(eventName, worker);
+}
+
+// Send internal message
+function sendInternalMessage(worker, message/*, handler, callback*/) {
+
+  // Exist callback
+  var callback = arguments[arguments.length - 1];
+  if (typeof callback !== 'function') {
+    callback = undefined;
+  }
+
+  // exist handler
+  var handler = arguments[2] !== callback ? arguments[2] : undefined;
+
+  if (!isInternalMessage(message)) {
+    message = internalMessage(message);
+  }
+
+  // Store callback for later
+  if (callback) {
+    message._requestEcho = worker.uniqueID + ':' + (++queryIds);
+    queryCallbacks[message._requestEcho] = callback;
+  }
+
+
+  worker.send(message, handler);
+}
+
+// Send message to worker or master
+Worker.prototype.send = function() {
+
+  //You could also just use process.send in a worker
+  this.process.send.apply(this.process, arguments);
 };
 
 
-function queryMaster(msg, cb) {
-  assert(cluster.isWorker);
+function closeWorkerChannel(worker, callback) {
+  //Apparently the .close method is async, but do not have a callback
+  worker.process._channel.close();
+  worker.process._channel = null;
+  process.nextTick(callback);
+}
 
-  debug('send ' + JSON.stringify(msg));
+// Kill the worker without restarting
+Worker.prototype.destroy = function() {
+  var self = this;
 
-  // Grab some random queryId
-  msg._queryId = (++queryIds);
-  msg._workerId = workerId;
+  this.suicide = true;
 
-  // Store callback for later. Callback called in _startWorker.
-  if (cb) {
-    queryCallbacks[msg._queryId] = cb;
+  if (cluster.isMaster) {
+    // Stop channel
+    // this way the worker won't need to propagate suicide state to master
+    closeWorkerChannel(this, function() {
+      // Then kill worker
+      self.process.kill();
+    });
+
+  } else {
+    // Channel is open
+    if (this.process._channel !== null) {
+
+      // Inform master that is is suicide and then kill
+      sendInternalMessage(this, {cmd: 'suicide'}, function() {
+        // Kill worker
+        process.exit(0);
+      });
+
+      // When master do a quickDestroy the channel is not necesarily closed
+      // at the point this function runs. For that reason we need to keep
+      // checking that the channel is still open, until a actually callback
+      // from the master is resicved. Also we can't do a timeout and then
+      // just kill, since we don't know if the quickDestroy function was called.
+      setInterval(function() {
+        if (self.process._channel === null) {
+          process.exit(0);
+        }
+      }, 200);
+
+    } else {
+      process.exit(0);
+    }
   }
+};
+
+// Fork a new worker
+cluster.fork = function(env) {
+  // This can only be called from the master.
+  assert(cluster.isMaster);
 
-  // Send message to master.
-  process.send(msg);
+  // Make sure that the master has been initalized
+  startMaster();
+
+  return (new cluster.Worker(env));
+};
+
+// Sync way to quickly kill all cluster workers
+// However the workers may not die instantly
+function quickDestroyCluster() {
+  eachWorker(function(worker) {
+    worker.process.kill();
+  });
 }
 
+// Internal function. Called from src/node.js when worker process starts.
+cluster._setupWorker = function() {
+  // Get worker class
+  var worker = cluster.worker = new Worker();
+
+  // Tell master that the worker is online
+  worker.state = 'online';
+  sendInternalMessage(worker, { cmd: 'online' });
+};
 
-// Internal function. Called by lib/net.js when attempting to bind a
-// server.
-cluster._getServer = function(address, port, addressType, cb) {
+// Internal function. Called by lib/net.js when attempting to bind a server.
+cluster._getServer = function(tcpSelf, address, port, addressType, cb) {
+  // This can only be called from a worker.
   assert(cluster.isWorker);
 
-  queryMaster({
+  // Store tcp instance for later use
+  var key = [address, port, addressType].join(':');
+  serverLisenters[key] = tcpSelf;
+
+  // Send a listening message to the master
+  tcpSelf.once('listening', function() {
+    cluster.worker.state = 'listening';
+    sendInternalMessage(cluster.worker, {
+      cmd: 'listening',
+      address: address,
+      port: port,
+      addressType: addressType
+    });
+  });
+
+  // Request the fd handler from the master process
+  var message = {
     cmd: 'queryServer',
     address: address,
     port: port,
     addressType: addressType
-  }, function(msg, handle) {
+  };
+
+  // The callback will be stored until the master has responed
+  sendInternalMessage(cluster.worker, message, function(msg, handle) {
     cb(handle);
   });
+
 };
index 1262186..5073295 100644 (file)
@@ -750,8 +750,8 @@ Server.prototype._listen2 = function(address, port, addressType) {
 
 
 function listen(self, address, port, addressType) {
-  if (process.env.NODE_WORKER_ID) {
-    require('cluster')._getServer(address, port, addressType, function(handle) {
+  if (process.env.NODE_UNIQUE_ID) {
+    require('cluster')._getServer(self, address, port, addressType, function(handle) {
       self._handle = handle;
       self._listen2(address, port, addressType);
     });
index 15cccb6..1f159d6 100644 (file)
@@ -86,9 +86,9 @@
 
       // If this is a worker in cluster mode, start up the communiction
       // channel.
-      if (process.env.NODE_WORKER_ID) {
+      if (process.env.NODE_UNIQUE_ID) {
         var cluster = NativeModule.require('cluster');
-        cluster._startWorker();
+        cluster._setupWorker();
       }
 
       var Module = NativeModule.require('module');
diff --git a/test/simple/test-child-process-internal.js b/test/simple/test-child-process-internal.js
new file mode 100644 (file)
index 0000000..91e90d4
--- /dev/null
@@ -0,0 +1,58 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+
+//messages
+var PREFIX = 'NODE_';
+var normal = {cmd: 'foo' + PREFIX};
+var internal = {cmd: PREFIX + 'bar'};
+
+if (process.argv[2] === 'child') {
+  //send non-internal message containing PREFIX at a non prefix position
+  process.send(normal);
+
+  //send inernal message
+  process.send(internal);
+
+  process.exit(0);
+
+} else {
+
+  var fork = require('child_process').fork;
+  var child = fork(process.argv[1], ['child']);
+
+  var gotNormal;
+  child.once('message', function(data) {
+    gotNormal = data;
+  });
+
+  var gotInternal;
+  child.once('inernalMessage', function(data) {
+    gotInternal = data;
+  });
+
+  process.on('exit', function() {
+    assert.deepEqual(gotNormal, normal);
+    assert.deepEqual(gotInternal, internal);
+  });
+}
diff --git a/test/simple/test-cluster-basic.js b/test/simple/test-cluster-basic.js
new file mode 100644 (file)
index 0000000..adc7775
--- /dev/null
@@ -0,0 +1,161 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+var common = require('../common');
+var assert = require('assert');
+var cluster = require('cluster');
+
+function forEach(obj, fn) {
+  Object.keys(obj).forEach(function(name, index) {
+    fn(obj[name], name, index);
+  });
+}
+
+
+if (cluster.isWorker) {
+  var http = require('http');
+  http.Server(function() {
+
+  }).listen(common.PORT, '127.0.0.1');
+}
+
+else if (cluster.isMaster) {
+
+  assert.equal('NODE_UNIQUE_ID' in process.env, false,
+      'cluster.isMaster should not be true when NODE_UNIQUE_ID is set');
+
+  var checks = {
+    cluster: {
+      events: {
+        fork: false,
+        online: false,
+        listening: false,
+        death: false
+      },
+      equal: {
+        fork: false,
+        online: false,
+        listening: false,
+        death: false
+      }
+    },
+
+    worker: {
+      events: {
+        online: false,
+        listening: false,
+        death: false
+      },
+      equal: {
+        online: false,
+        listening: false,
+        death: false
+      },
+      states: {
+        none: false,
+        online: false,
+        listening: false,
+        dead: false
+      }
+    }
+  };
+
+  var worker;
+  var stateNames = Object.keys(checks.worker.states);
+
+  //Check events, states, and emit arguments
+  forEach(checks.cluster.events, function(bool, name, index) {
+
+    //Listen on event
+    cluster.on(name, function(/* worker */) {
+
+      //Set event
+      checks.cluster.events[name] = true;
+
+      //Check argument
+      checks.cluster.equal[name] = worker === arguments[0];
+
+      //Check state
+      var state = stateNames[index];
+      checks.worker.states[state] = (state === worker.state);
+    });
+  });
+
+  //Kill worker when listening
+  cluster.on('listening', function() {
+    worker.destroy();
+  });
+
+  //Kill process when worker is killed
+  cluster.on('death', function() {
+    process.exit(0);
+  });
+
+  //Create worker
+  worker = cluster.fork();
+  assert.ok(worker instanceof cluster.Worker,
+      'the worker is not a instance of the Worker constructor');
+
+  //Check event
+  forEach(checks.worker.events, function(bool, name, index) {
+    worker.on(name, function() {
+      //Set event
+      checks.worker.events[name] = true;
+
+      //Check argument
+      checks.worker.equal[name] = worker === arguments[0];
+    });
+  });
+
+  //Check all values
+  process.once('exit', function() {
+    //Check cluster events
+    forEach(checks.cluster.events, function(check, name) {
+      assert.ok(check, 'The cluster event "' + name + '" on the cluster ' +
+                'object did not fire');
+    });
+
+    //Check cluster event arguments
+    forEach(checks.cluster.equal, function(check, name) {
+      assert.ok(check, 'The cluster event "' + name + '" did not emit ' +
+                'with corrent argument');
+    });
+
+    //Check worker states
+    forEach(checks.worker.states, function(check, name) {
+      assert.ok(check, 'The worker state "' + name + '" was not set to true');
+    });
+
+    //Check worker events
+    forEach(checks.worker.events, function(check, name) {
+      assert.ok(check, 'The worker event "' + name + '" on the worker object ' +
+                'did not fire');
+    });
+
+    //Check worker event arguments
+    forEach(checks.worker.equal, function(check, name) {
+      assert.ok(check, 'The worker event "' + name + '" did not emit with ' +
+                'corrent argument');
+    });
+  });
+
+}
index f62f804..a0b50ae 100644 (file)
@@ -25,8 +25,7 @@ var assert = require('assert');
 var cluster = require('cluster');
 
 if (cluster.isWorker) {
-  process.send({
-    testcase: true,
+  cluster.worker.send({
     prop: process.env['cluster_test_prop'],
     overwrite: process.env['cluster_test_overwrite']
   });
@@ -38,23 +37,21 @@ if (cluster.isWorker) {
     overwrite: false
   };
 
-  //To check that the cluster extend on the process.env we will overwrite a
-  //property
+  // To check that the cluster extend on the process.env we will overwrite a
+  // property
   process.env['cluster_test_overwrite'] = 'old';
 
-  //Fork worker
+  // Fork worker
   var worker = cluster.fork({
     'cluster_test_prop': 'custom',
     'cluster_test_overwrite': 'new'
   });
 
-  //Checks worker env
+  // Checks worker env
   worker.on('message', function(data) {
-    if (data.testcase) {
-      checks.using = (data.prop === 'custom');
-      checks.overwrite = (data.overwrite === 'new');
-      process.exit(0);
-    }
+    checks.using = (data.prop === 'custom');
+    checks.overwrite = (data.overwrite === 'new');
+    process.exit(0);
   });
 
   process.once('exit', function() {
index c9f2bf2..526869c 100644 (file)
@@ -72,7 +72,7 @@ if (isTestRunner) {
   // Cluster stuff.
   if (cluster.isMaster) {
     var worker = cluster.fork();
-    process.send({ workerPID: worker.pid });
+    process.send({ workerPID: worker.process.pid });
     // should kill the worker too
     throw new Error('kill master');
   } else {
diff --git a/test/simple/test-cluster-message.js b/test/simple/test-cluster-message.js
new file mode 100644 (file)
index 0000000..6bab71d
--- /dev/null
@@ -0,0 +1,130 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+var common = require('../common');
+var assert = require('assert');
+var cluster = require('cluster');
+var net = require('net');
+
+function forEach(obj, fn) {
+  Object.keys(obj).forEach(function(name, index) {
+    fn(obj[name], name, index);
+  });
+}
+
+if (cluster.isWorker) {
+  // Create a tcp server
+  // this will be used as cluster-shared-server
+  // and as an alternativ IPC channel
+  var server = net.Server();
+  server.on('connection', function(socket) {
+
+    // Tell master using TCP socket that a message is received
+    process.on('message', function(message) {
+      socket.write(JSON.stringify({
+        code: 'received message',
+        echo: message
+      }));
+    });
+
+    process.send('message from worker');
+  });
+
+  server.listen(common.PORT, '127.0.0.1');
+}
+
+else if (cluster.isMaster) {
+
+  var checks = {
+    master: {
+      'receive': false,
+      'correct': false
+    },
+    worker: {
+      'receive': false,
+      'correct': false
+    }
+  };
+
+
+  var client;
+  var check = function(type, result) {
+    checks[type].receive = true;
+    checks[type].correct = result;
+
+    var missing = false;
+    forEach(checks, function(type) {
+      if (type.receive === false) missing = true;
+    });
+
+    if (missing === false) {
+      client.end();
+    }
+  };
+
+  // Spawn worker
+  var worker = cluster.fork();
+
+  // When a IPC message is resicved form the worker
+  worker.on('message', function(message) {
+    check('master', message === 'message from worker');
+  });
+
+  // When a TCP connection is made with the worker connect to it
+  worker.on('listening', function() {
+
+    client = net.connect(common.PORT, function() {
+
+      //Send message to worker
+      worker.send('message from master');
+    });
+
+    client.on('data', function(data) {
+      // All data is JSON
+      data = JSON.parse(data.toString());
+
+      if (data.code === 'received message') {
+        check('worker', data.echo === 'message from master');
+      } else {
+        throw new Error('worng TCP message recived: ' + data);
+      }
+    });
+
+    // When the connection ends kill worker and shutdown process
+    client.on('end', function() {
+      worker.destroy();
+    });
+
+    worker.on('death', function() {
+      process.exit(0);
+    });
+
+  });
+
+  process.once('exit', function() {
+    forEach(checks, function(check, type) {
+      assert.ok(check.receive, 'The ' + type + ' did not receive any message');
+      assert.ok(check.correct,
+                'The ' + type + ' did not get the correct message');
+    });
+  });
+}