From: Andreas Madsen Date: Tue, 20 Dec 2011 09:42:48 +0000 (+0100) Subject: cluster improvements: Worker class and isolate internal messages X-Git-Tag: v0.7.0~60 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=5f08c3cfa1ee3ece6032d1ffeda6f403e54dd5c0;p=platform%2Fupstream%2Fnodejs.git cluster improvements: Worker class and isolate internal messages Fixes #2388 --- diff --git a/doc/api/child_processes.markdown b/doc/api/child_processes.markdown index 8a1f171..e34a4e9 100644 --- a/doc/api/child_processes.markdown +++ b/doc/api/child_processes.markdown @@ -229,6 +229,12 @@ And then the child script, `'sub.js'` might look like this: In the child the `process` object will have a `send()` method, and `process` will emit objects each time it receives a message on its channel. +There is a special case when seinding a `{cmd: 'NODE_foo'}` message. All messages +containging a `NODE_` prefix in its `cmd` property will not be emitted in +the `message` event, since this are internal messages used by node core. +Messages contain the prefix are emitted in the `internalMessage` event, you +should by all means avoid using this feature, it may change without warranty. + By default the spawned Node process will have the stdout, stderr associated with the parent's. To change this behavior set the `silent` property in the `options` object to `true`. diff --git a/doc/api/cluster.markdown b/doc/api/cluster.markdown index b63ad26..5f77f54 100644 --- a/doc/api/cluster.markdown +++ b/doc/api/cluster.markdown @@ -21,8 +21,9 @@ all share server ports. console.log('worker ' + worker.pid + ' died'); }); } else { - // Worker processes have a http server. - http.Server(function(req, res) { + // Workers can share any TCP connection + // In this case its a HTTP server + http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); @@ -34,66 +35,221 @@ Running node will now share port 8000 between the workers: Worker 2438 online Worker 2437 online -The difference between `cluster.fork()` and `child_process.fork()` is simply -that cluster allows TCP servers to be shared between workers. `cluster.fork` -is implemented on top of `child_process.fork`. The message passing API that -is available with `child_process.fork` is available with `cluster` as well. -As an example, here is a cluster which keeps count of the number of requests -in the master process via message passing: + +### cluster.isMaster + +This boolean flag is true if the process is a master. This is determined +by the `process.env.NODE_UNIQUE_ID`. If `process.env.NODE_UNIQUE_ID` is +undefined `isMaster` is `true`. + +### cluster.isWorker + +This boolean flag is true if the process is a worker forked from a master. +If the `process.env.NODE_UNIQUE_ID` is set to a value different efined +`isWorker` is `true`. + +### Event: 'fork' + +When a new worker is forked the cluster module will emit a 'fork' event. +This can be used to log worker activity, and create you own timeout. + + var timeouts = []; + var errorMsg = function () { + console.error("Something must be wrong with the connection ..."); + }); + + cluster.on('fork', function (worker) { + timeouts[worker.uniqueID] = setTimeout(errorMsg, 2000); + }); + cluster.on('listening', function (worker) { + clearTimeout(timeouts[worker.uniqueID]); + }); + cluster.on('death', function (worker) { + clearTimeout(timeouts[worker.uniqueID]); + errorMsg(); + }); + +### Event: 'online' + +After forking a new worker, the worker should respond with a online message. +When the master receives a online message it will emit such event. +The difference between 'fork' and 'online' is that fork is emitted when the +master tries to fork a worker, and 'online' is emitted when the worker is being +executed. + + cluster.on('online', function (worker) { + console.log("Yay, the worker responded after it was forked"); + }); + +### Event: 'listening' + +When calling `listen()` from a worker, a 'listening' event is automatically assigned +to the server instance. When the server is listening a message is send to the master +where the 'listening' event is emitted. + + cluster.on('listening', function (worker) { + console.log("We are now connected"); + }); + +### Event: 'death' + +When any of the workers die the cluster module will emit the 'death' event. +This can be used to restart the worker by calling `fork()` again. + + cluster.on('death', function(worker) { + console.log('worker ' + worker.pid + ' died. restart...'); + cluster.fork(); + }); + +### cluster.fork([env]) + +Spawn a new worker process. This can only be called from the master process. +The function takes an optional `env` object. The properties in this object +will be added to the process environment in the worker. + +### cluster.workers + +In the cluster all living worker objects are stored in this object by there +`uniqueID` as the key. This makes it easy to loop thouge all liveing workers. + + // Go througe all workers + function eachWorker(callback) { + for (var uniqueID in cluster.workers) { + callback(cluster.workers[uniqueID]); + } + } + eachWorker(function (worker) { + worker.send('big announcement to all workers'); + }); + +Should you wich to reference a worker over a communication channel this unsing +there `uniqueID` this is also the easies way to find the worker. + + socket.on('data', function (uniqueID) { + var worker = cluster.workers[uniqueID]; + }); + +## Worker + +This object contains all public information and method about a worker. +In the master it can be obtainedusing `cluster.workers`. In a worker +it can be obtained ained using `cluster.worker`. + +### Worker.uniqueID + +Each new worker is given its own unique id, this id i stored in the `uniqueID`. + +### Worker.process + +All workers are created using `child_process.fork()`, the returned object from this +function is stored in process. + +### Worker.send(message, [sendHandle]) + +This function is equal to the send methods provided by `child_process.fork()`. +In the master you should use this function to send a message to a specific worker. +However in a worker you can also use `process.send(message)`, since this is the same +function. + +This example will echo back all messages from the master: + + if (cluster.isMaster) { + var worker = cluster.fork(); + worker.send('hi there'); + + } else if (cluster.isWorker) { + process.on('message', function (msg) { + process.send(msg); + }); + } + +### Worker.destroy() + +This function will kill the worker, and inform the master to not spawn a new worker. +To know the difference between suicide and accidently death a suicide boolean is set to true. + + cluster.on('death', function (worker) { + if (worker.suicide === true) { + console.log('Oh, it was just suicide' – no need to worry'). + } + }); + + // destroy worker + worker.destroy(); + +### Worker.suicide + +This property is a boolean. It is set when a worker dies, until then it is `undefined`. +It is true if the worker was killed using the `.destroy()` method, and false otherwise. + +### Event: message + +This event is the same as the one provided by `child_process.fork()`. +In the master you should use this event, however in a worker you can also use +`process.on('message')` + +As an example, here is a cluster that keeps count of the number of requests +in the master process using the message system: var cluster = require('cluster'); var http = require('http'); - var numReqs = 0; if (cluster.isMaster) { - // Fork workers. - for (var i = 0; i < 2; i++) { - var worker = cluster.fork(); - - worker.on('message', function(msg) { - if (msg.cmd && msg.cmd == 'notifyRequest') { - numReqs++; - } - }); - } + // Keep track of http requests + var numReqs = 0; setInterval(function() { console.log("numReqs =", numReqs); }, 1000); + + // Count requestes + var messageHandler = function (msg) { + if (msg.cmd && msg.cmd == 'notifyRequest') { + numReqs += 1; + } + }; + + // Start workers and listen for messages containing notifyRequest + cluster.autoFork(); + Object.keys(cluster.workers).forEach(function (uniqueID) { + cluster.workers[uniqueID].on('message', messageHandler); + }); + } else { + // Worker processes have a http server. http.Server(function(req, res) { res.writeHead(200); res.end("hello world\n"); - // Send message to master process + + // notify master about the request process.send({ cmd: 'notifyRequest' }); }).listen(8000); } +### Event: online +Same as the `cluster.on('online')` event, but emits only when the state change +on the specified worker. -### cluster.fork([env]) + cluster.fork().on('online', function (worker) { + // Worker is online + }; -Spawn a new worker process. This can only be called from the master process. -The function takes an optional `env` object. The propertyies in this object -will be added to the process environment in the worker. +### Event: listening -### cluster.isMaster -### cluster.isWorker +Same as the `cluster.on('listening')` event, but emits only when the state change +on the specified worker. -Boolean flags to determine if the current process is a master or a worker -process in a cluster. A process `isMaster` if `process.env.NODE_WORKER_ID` -is undefined. + cluster.fork().on('listening', function (worker) { + // Worker is listening + }; -### Event: 'death' +### Event: death -When any of the workers die the cluster module will emit the 'death' event. -This can be used to restart the worker by calling `fork()` again. - - cluster.on('death', function(worker) { - console.log('worker ' + worker.pid + ' died. restart...'); - cluster.fork(); - }); +Same as the `cluster.on('death')` event, but emits only when the state change +on the specified worker. -Different techniques can be used to restart the worker depending on the -application. + cluster.fork().on('death', function (worker) { + // Worker has died + }; diff --git a/lib/child_process.js b/lib/child_process.js index b3e6383..2e39166 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -95,11 +95,25 @@ function setupChannel(target, channel) { jsonBuffer += pool.toString('ascii', offset, offset + length); var i, start = 0; + + //Linebreak is used as a message end sign while ((i = jsonBuffer.indexOf('\n', start)) >= 0) { var json = jsonBuffer.slice(start, i); var message = JSON.parse(json); - target.emit('message', message, recvHandle); + //Filter out internal messages + //if cmd property begin with "_NODE" + if (message !== null && + typeof message === 'object' && + typeof message.cmd === 'string' && + message.cmd.indexOf('NODE_') === 0) { + target.emit('inernalMessage', message, recvHandle); + } + //Non-internal message + else { + target.emit('message', message, recvHandle); + } + start = i + 1; } jsonBuffer = jsonBuffer.slice(start); diff --git a/lib/cluster.js b/lib/cluster.js index 5892481..90da833 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -23,22 +23,24 @@ var assert = require('assert'); var fork = require('child_process').fork; var net = require('net'); var EventEmitter = require('events').EventEmitter; +var util = require('util'); function isObject(o) { return (typeof o === 'object' && o !== null); } function extendObject(origin, add) { + // Don't do anything if add isn't an object + if (!add) return origin; + var keys = Object.keys(add), i = keys.length; - while(i--) { + while (i--) { origin[keys[i]] = add[keys[i]]; } return origin; } -var cluster = module.exports = new EventEmitter(); - var debug; if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { debug = function(x) { @@ -50,23 +52,42 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { debug = function() { }; } +// cluster object: +function cluster() {} +util.inherits(cluster, EventEmitter); +var cluster = module.exports = new cluster(); // Used in the master: var masterStarted = false; var ids = 0; -var workers = []; -var servers = {}; +var serverHandlers = {}; var workerFilename; var workerArgs; // Used in the worker: -var workerId = 0; +var serverLisenters = {}; var queryIds = 0; var queryCallbacks = {}; -cluster.isWorker = 'NODE_WORKER_ID' in process.env; +// Define isWorker and isMaster +cluster.isWorker = 'NODE_UNIQUE_ID' in process.env; cluster.isMaster = ! cluster.isWorker; +// The worker object is only used in a worker +cluster.worker = cluster.isWorker ? {} : null; +// The workers array is oly used in the naster +cluster.workers = cluster.isMaster ? {} : null; + +// Simple function there call a function on each worker +function eachWorker(cb) { + // Go througe all workers + for (var id in cluster.workers) { + if (cluster.workers.hasOwnProperty(id)) { + cb(cluster.workers[id]); + } + } +} + // Call this from the master process. It will start child workers. // // options.workerFilename @@ -90,155 +111,375 @@ function startMaster() { workerArgs = process.argv.slice(2); process.on('uncaughtException', function(e) { - // Quickly try to kill all the workers. - // TODO: be session leader - will cause auto SIGHUP to the children. - eachWorker(function(worker) { - debug('kill worker ' + worker.pid); - worker.kill(); - }); - console.error('Exception in cluster master process: ' + e.message + '\n' + e.stack); + + quickDestroyCluster(); process.exit(1); }); } +// Check if a message is internal only +var INTERNAL_PREFIX = 'NODE_CLUTER_'; +function isInternalMessage(message) { + return (isObject(message) && + typeof message.cmd === 'string' && + message.cmd.indexOf(INTERNAL_PREFIX) === 0); +} -function handleWorkerMessage(worker, message) { - // This can only be called from the master. - assert(cluster.isMaster); +// Modyfi message object to be internal +function internalMessage(inMessage) { + var outMessage = extendObject({}, inMessage); + + // Add internal prefix to cmd + outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || ''); - debug('recv ' + JSON.stringify(message)); - - switch (message.cmd) { - case 'online': - debug('Worker ' + worker.pid + ' online'); - worker.online = true; - break; - - case 'queryServer': - var key = message.address + ':' + - message.port + ':' + - message.addressType; - var response = { _queryId: message._queryId }; - - if (!(key in servers)) { - // Create a new server. - debug('create new server ' + key); - servers[key] = net._createServerHandle(message.address, - message.port, - message.addressType); - } - worker.send(response, servers[key]); - break; - - default: - // Ignore. - break; + return outMessage; +} + +// Handle callback messges +function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) { + + // The message there will be send + var message = internalMessage(outMessage); + + // callback id - will be undefined if not set + message._queryEcho = inMessage._requestEcho; + + // Call callback if a query echo is received + if (inMessage.hasOwnProperty('_queryEcho')) { + queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle); + delete queryCallbacks[inMessage._queryEcho]; + } + + // Send if outWrap do contain something useful + if (!(outMessage === undefined && message._queryEcho === undefined)) { + sendInternalMessage(worker, message, outHandle); } } +// Handle messages from both master and workers +var messageHandingObject = {}; +function handleMessage(inMessage, inHandle, worker) { -function eachWorker(cb) { - // This can only be called from the master. - assert(cluster.isMaster); + //Remove internal prefix + var message = extendObject({}, inMessage); + message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length); - for (var id in workers) { - if (workers[id]) { - cb(workers[id]); - } + var respondUsed = false; + var respond = function(outMessage, outHandler) { + respondUsed = true; + handleResponse(outMessage, outHandler, inMessage, inHandle, worker); + }; + + // Run handler if it exist + if (messageHandingObject[message.cmd]) { + messageHandingObject[message.cmd](message, worker, respond); + } + + // Send respond if it wasn't done + if (respondUsed === false) { + respond(); } } +// Messages to the master will be handled using this methods +if (cluster.isMaster) { -cluster.fork = function(env) { - // This can only be called from the master. - assert(cluster.isMaster); + // Handle online messages from workers + messageHandingObject.online = function(message, worker) { + worker.state = 'online'; + debug('Worker ' + worker.process.pid + ' online'); + worker.emit('online', worker); + cluster.emit('online', worker); + }; - // Lazily start the master process stuff. - startMaster(); + // Handle queryServer messages form workers + messageHandingObject.queryServer = function(message, worker, send) { + + // This sequence of infomation is unique to the connection but not + // to the worker + var args = [message.address, message.port, message.addressType]; + var key = args.join(':'); + var handler; + + if (serverHandlers.hasOwnProperty(key)) { + handler = serverHandlers[key]; + } else { + handler = serverHandlers[key] = net._createServerHandle.apply(net, args); + } + + // echo callback with the fd handler associated with it + send({}, handler); + }; + + // Handle listening messages from workers + messageHandingObject.listening = function(message, worker) { + + worker.state = 'listening'; + + // Emit listining, now that we know the worker is listning + worker.emit('listening', worker, { + address: message.address, + port: message.port, + addressType: message.addressType + }); + cluster.emit('listening', worker, { + address: message.address, + port: message.port, + addressType: message.addressType + }); + }; + + // Handle suicide messages from workers + messageHandingObject.suicide = function(message, worker) { + worker.suicide = true; + }; + +} + +// Messages to a worker will be handled using this methods +else if (cluster.isWorker) { + + // TODO: the disconnect step will use this +} + +function toDecInt(value) { + value = parseInt(value, 10); + return isNaN(value) ? null : value; +} + +// Create a worker object, there works both for master and worker +function Worker(customEnv) { + if (!(this instanceof Worker)) return new Worker(); + + var self = this; + var env = process.env; + + // Assign uniqueID, default null + this.uniqueID = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID); + + // Assign state + this.state = 'none'; - var id = ++ids; + // Create or get process + if (cluster.isMaster) { - //Create env object - var envCopy = extendObject({}, process.env); - envCopy['NODE_WORKER_ID'] = id; - if (isObject(env)) { - envCopy = extendObject(envCopy, env); + // Create env object + // first: copy and add uniqueID + var envCopy = extendObject({}, env); + envCopy['NODE_UNIQUE_ID'] = this.uniqueID; + // second: extend envCopy with the env argument + if (isObject(customEnv)) { + envCopy = extendObject(envCopy, customEnv); + } + + // fork worker + this.process = fork(workerFilename, workerArgs, { + 'env': envCopy + }); + + } else { + this.process = process; } - //fork worker - var worker = fork(workerFilename, workerArgs, { - 'env': envCopy - }); + if (cluster.isMaster) { + // Save worker in the cluster.workers array + cluster.workers[this.uniqueID] = this; - workers[id] = worker; + // Emit a fork event, on next tick + // There is no worker.fork event since this has no real purpose + process.nextTick(function() { + cluster.emit('fork', self); + }); + } - worker.on('message', function(message) { - handleWorkerMessage(worker, message); - }); + // Internal message: handle message + this.process.on('inernalMessage', function(message, handle) { + debug('recived: ', message); - worker.on('exit', function() { - debug('worker id=' + id + ' died'); - delete workers[id]; - cluster.emit('death', worker); + // relay to handleMessage + handleMessage(message, handle, self); + return; }); - return worker; -}; + // Non-internal message: relay to Worker object + this.process.on('message', function(message, handle) { + self.emit('message', message, handle); + }); + // Handle exit + self.process.on('exit', function() { + debug('worker id=' + self.uniqueID + ' died'); -// Internal function. Called from src/node.js when worker process starts. -cluster._startWorker = function() { - assert(cluster.isWorker); - workerId = parseInt(process.env.NODE_WORKER_ID, 10); - - queryMaster({ cmd: 'online' }); - - // Make callbacks from queryMaster() - process.on('message', function(msg, handle) { - debug('recv ' + JSON.stringify(msg)); - if (msg._queryId && msg._queryId in queryCallbacks) { - var cb = queryCallbacks[msg._queryId]; - if (typeof cb == 'function') { - cb(msg, handle); - } - delete queryCallbacks[msg._queryId]; - } + // Prepare worker to die and emit events + prepareDeath(self, 'dead', 'death'); }); + +} +util.inherits(Worker, EventEmitter); +cluster.Worker = Worker; + +function prepareDeath(worker, state, eventName) { + + // set state to disconnect + worker.state = state; + + // Make suicide a boolean + worker.suicide = !!worker.suicide; + + // Remove from workers in the master + if (cluster.isMaster) { + delete cluster.workers[worker.uniqueID]; + } + + // Emit events + worker.emit(eventName, worker); + cluster.emit(eventName, worker); +} + +// Send internal message +function sendInternalMessage(worker, message/*, handler, callback*/) { + + // Exist callback + var callback = arguments[arguments.length - 1]; + if (typeof callback !== 'function') { + callback = undefined; + } + + // exist handler + var handler = arguments[2] !== callback ? arguments[2] : undefined; + + if (!isInternalMessage(message)) { + message = internalMessage(message); + } + + // Store callback for later + if (callback) { + message._requestEcho = worker.uniqueID + ':' + (++queryIds); + queryCallbacks[message._requestEcho] = callback; + } + + + worker.send(message, handler); +} + +// Send message to worker or master +Worker.prototype.send = function() { + + //You could also just use process.send in a worker + this.process.send.apply(this.process, arguments); }; -function queryMaster(msg, cb) { - assert(cluster.isWorker); +function closeWorkerChannel(worker, callback) { + //Apparently the .close method is async, but do not have a callback + worker.process._channel.close(); + worker.process._channel = null; + process.nextTick(callback); +} - debug('send ' + JSON.stringify(msg)); +// Kill the worker without restarting +Worker.prototype.destroy = function() { + var self = this; - // Grab some random queryId - msg._queryId = (++queryIds); - msg._workerId = workerId; + this.suicide = true; - // Store callback for later. Callback called in _startWorker. - if (cb) { - queryCallbacks[msg._queryId] = cb; + if (cluster.isMaster) { + // Stop channel + // this way the worker won't need to propagate suicide state to master + closeWorkerChannel(this, function() { + // Then kill worker + self.process.kill(); + }); + + } else { + // Channel is open + if (this.process._channel !== null) { + + // Inform master that is is suicide and then kill + sendInternalMessage(this, {cmd: 'suicide'}, function() { + // Kill worker + process.exit(0); + }); + + // When master do a quickDestroy the channel is not necesarily closed + // at the point this function runs. For that reason we need to keep + // checking that the channel is still open, until a actually callback + // from the master is resicved. Also we can't do a timeout and then + // just kill, since we don't know if the quickDestroy function was called. + setInterval(function() { + if (self.process._channel === null) { + process.exit(0); + } + }, 200); + + } else { + process.exit(0); + } } +}; + +// Fork a new worker +cluster.fork = function(env) { + // This can only be called from the master. + assert(cluster.isMaster); - // Send message to master. - process.send(msg); + // Make sure that the master has been initalized + startMaster(); + + return (new cluster.Worker(env)); +}; + +// Sync way to quickly kill all cluster workers +// However the workers may not die instantly +function quickDestroyCluster() { + eachWorker(function(worker) { + worker.process.kill(); + }); } +// Internal function. Called from src/node.js when worker process starts. +cluster._setupWorker = function() { + // Get worker class + var worker = cluster.worker = new Worker(); + + // Tell master that the worker is online + worker.state = 'online'; + sendInternalMessage(worker, { cmd: 'online' }); +}; -// Internal function. Called by lib/net.js when attempting to bind a -// server. -cluster._getServer = function(address, port, addressType, cb) { +// Internal function. Called by lib/net.js when attempting to bind a server. +cluster._getServer = function(tcpSelf, address, port, addressType, cb) { + // This can only be called from a worker. assert(cluster.isWorker); - queryMaster({ + // Store tcp instance for later use + var key = [address, port, addressType].join(':'); + serverLisenters[key] = tcpSelf; + + // Send a listening message to the master + tcpSelf.once('listening', function() { + cluster.worker.state = 'listening'; + sendInternalMessage(cluster.worker, { + cmd: 'listening', + address: address, + port: port, + addressType: addressType + }); + }); + + // Request the fd handler from the master process + var message = { cmd: 'queryServer', address: address, port: port, addressType: addressType - }, function(msg, handle) { + }; + + // The callback will be stored until the master has responed + sendInternalMessage(cluster.worker, message, function(msg, handle) { cb(handle); }); + }; diff --git a/lib/net.js b/lib/net.js index 1262186..5073295 100644 --- a/lib/net.js +++ b/lib/net.js @@ -750,8 +750,8 @@ Server.prototype._listen2 = function(address, port, addressType) { function listen(self, address, port, addressType) { - if (process.env.NODE_WORKER_ID) { - require('cluster')._getServer(address, port, addressType, function(handle) { + if (process.env.NODE_UNIQUE_ID) { + require('cluster')._getServer(self, address, port, addressType, function(handle) { self._handle = handle; self._listen2(address, port, addressType); }); diff --git a/src/node.js b/src/node.js index 15cccb6..1f159d6 100644 --- a/src/node.js +++ b/src/node.js @@ -86,9 +86,9 @@ // If this is a worker in cluster mode, start up the communiction // channel. - if (process.env.NODE_WORKER_ID) { + if (process.env.NODE_UNIQUE_ID) { var cluster = NativeModule.require('cluster'); - cluster._startWorker(); + cluster._setupWorker(); } var Module = NativeModule.require('module'); diff --git a/test/simple/test-child-process-internal.js b/test/simple/test-child-process-internal.js new file mode 100644 index 0000000..91e90d4 --- /dev/null +++ b/test/simple/test-child-process-internal.js @@ -0,0 +1,58 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); + +//messages +var PREFIX = 'NODE_'; +var normal = {cmd: 'foo' + PREFIX}; +var internal = {cmd: PREFIX + 'bar'}; + +if (process.argv[2] === 'child') { + //send non-internal message containing PREFIX at a non prefix position + process.send(normal); + + //send inernal message + process.send(internal); + + process.exit(0); + +} else { + + var fork = require('child_process').fork; + var child = fork(process.argv[1], ['child']); + + var gotNormal; + child.once('message', function(data) { + gotNormal = data; + }); + + var gotInternal; + child.once('inernalMessage', function(data) { + gotInternal = data; + }); + + process.on('exit', function() { + assert.deepEqual(gotNormal, normal); + assert.deepEqual(gotInternal, internal); + }); +} diff --git a/test/simple/test-cluster-basic.js b/test/simple/test-cluster-basic.js new file mode 100644 index 0000000..adc7775 --- /dev/null +++ b/test/simple/test-cluster-basic.js @@ -0,0 +1,161 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + + +var common = require('../common'); +var assert = require('assert'); +var cluster = require('cluster'); + +function forEach(obj, fn) { + Object.keys(obj).forEach(function(name, index) { + fn(obj[name], name, index); + }); +} + + +if (cluster.isWorker) { + var http = require('http'); + http.Server(function() { + + }).listen(common.PORT, '127.0.0.1'); +} + +else if (cluster.isMaster) { + + assert.equal('NODE_UNIQUE_ID' in process.env, false, + 'cluster.isMaster should not be true when NODE_UNIQUE_ID is set'); + + var checks = { + cluster: { + events: { + fork: false, + online: false, + listening: false, + death: false + }, + equal: { + fork: false, + online: false, + listening: false, + death: false + } + }, + + worker: { + events: { + online: false, + listening: false, + death: false + }, + equal: { + online: false, + listening: false, + death: false + }, + states: { + none: false, + online: false, + listening: false, + dead: false + } + } + }; + + var worker; + var stateNames = Object.keys(checks.worker.states); + + //Check events, states, and emit arguments + forEach(checks.cluster.events, function(bool, name, index) { + + //Listen on event + cluster.on(name, function(/* worker */) { + + //Set event + checks.cluster.events[name] = true; + + //Check argument + checks.cluster.equal[name] = worker === arguments[0]; + + //Check state + var state = stateNames[index]; + checks.worker.states[state] = (state === worker.state); + }); + }); + + //Kill worker when listening + cluster.on('listening', function() { + worker.destroy(); + }); + + //Kill process when worker is killed + cluster.on('death', function() { + process.exit(0); + }); + + //Create worker + worker = cluster.fork(); + assert.ok(worker instanceof cluster.Worker, + 'the worker is not a instance of the Worker constructor'); + + //Check event + forEach(checks.worker.events, function(bool, name, index) { + worker.on(name, function() { + //Set event + checks.worker.events[name] = true; + + //Check argument + checks.worker.equal[name] = worker === arguments[0]; + }); + }); + + //Check all values + process.once('exit', function() { + //Check cluster events + forEach(checks.cluster.events, function(check, name) { + assert.ok(check, 'The cluster event "' + name + '" on the cluster ' + + 'object did not fire'); + }); + + //Check cluster event arguments + forEach(checks.cluster.equal, function(check, name) { + assert.ok(check, 'The cluster event "' + name + '" did not emit ' + + 'with corrent argument'); + }); + + //Check worker states + forEach(checks.worker.states, function(check, name) { + assert.ok(check, 'The worker state "' + name + '" was not set to true'); + }); + + //Check worker events + forEach(checks.worker.events, function(check, name) { + assert.ok(check, 'The worker event "' + name + '" on the worker object ' + + 'did not fire'); + }); + + //Check worker event arguments + forEach(checks.worker.equal, function(check, name) { + assert.ok(check, 'The worker event "' + name + '" did not emit with ' + + 'corrent argument'); + }); + }); + +} diff --git a/test/simple/test-cluster-fork-env.js b/test/simple/test-cluster-fork-env.js index f62f804..a0b50ae 100644 --- a/test/simple/test-cluster-fork-env.js +++ b/test/simple/test-cluster-fork-env.js @@ -25,8 +25,7 @@ var assert = require('assert'); var cluster = require('cluster'); if (cluster.isWorker) { - process.send({ - testcase: true, + cluster.worker.send({ prop: process.env['cluster_test_prop'], overwrite: process.env['cluster_test_overwrite'] }); @@ -38,23 +37,21 @@ if (cluster.isWorker) { overwrite: false }; - //To check that the cluster extend on the process.env we will overwrite a - //property + // To check that the cluster extend on the process.env we will overwrite a + // property process.env['cluster_test_overwrite'] = 'old'; - //Fork worker + // Fork worker var worker = cluster.fork({ 'cluster_test_prop': 'custom', 'cluster_test_overwrite': 'new' }); - //Checks worker env + // Checks worker env worker.on('message', function(data) { - if (data.testcase) { - checks.using = (data.prop === 'custom'); - checks.overwrite = (data.overwrite === 'new'); - process.exit(0); - } + checks.using = (data.prop === 'custom'); + checks.overwrite = (data.overwrite === 'new'); + process.exit(0); }); process.once('exit', function() { diff --git a/test/simple/test-cluster-kill-workers.js b/test/simple/test-cluster-kill-workers.js index c9f2bf2..526869c 100644 --- a/test/simple/test-cluster-kill-workers.js +++ b/test/simple/test-cluster-kill-workers.js @@ -72,7 +72,7 @@ if (isTestRunner) { // Cluster stuff. if (cluster.isMaster) { var worker = cluster.fork(); - process.send({ workerPID: worker.pid }); + process.send({ workerPID: worker.process.pid }); // should kill the worker too throw new Error('kill master'); } else { diff --git a/test/simple/test-cluster-message.js b/test/simple/test-cluster-message.js new file mode 100644 index 0000000..6bab71d --- /dev/null +++ b/test/simple/test-cluster-message.js @@ -0,0 +1,130 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + + +var common = require('../common'); +var assert = require('assert'); +var cluster = require('cluster'); +var net = require('net'); + +function forEach(obj, fn) { + Object.keys(obj).forEach(function(name, index) { + fn(obj[name], name, index); + }); +} + +if (cluster.isWorker) { + // Create a tcp server + // this will be used as cluster-shared-server + // and as an alternativ IPC channel + var server = net.Server(); + server.on('connection', function(socket) { + + // Tell master using TCP socket that a message is received + process.on('message', function(message) { + socket.write(JSON.stringify({ + code: 'received message', + echo: message + })); + }); + + process.send('message from worker'); + }); + + server.listen(common.PORT, '127.0.0.1'); +} + +else if (cluster.isMaster) { + + var checks = { + master: { + 'receive': false, + 'correct': false + }, + worker: { + 'receive': false, + 'correct': false + } + }; + + + var client; + var check = function(type, result) { + checks[type].receive = true; + checks[type].correct = result; + + var missing = false; + forEach(checks, function(type) { + if (type.receive === false) missing = true; + }); + + if (missing === false) { + client.end(); + } + }; + + // Spawn worker + var worker = cluster.fork(); + + // When a IPC message is resicved form the worker + worker.on('message', function(message) { + check('master', message === 'message from worker'); + }); + + // When a TCP connection is made with the worker connect to it + worker.on('listening', function() { + + client = net.connect(common.PORT, function() { + + //Send message to worker + worker.send('message from master'); + }); + + client.on('data', function(data) { + // All data is JSON + data = JSON.parse(data.toString()); + + if (data.code === 'received message') { + check('worker', data.echo === 'message from master'); + } else { + throw new Error('worng TCP message recived: ' + data); + } + }); + + // When the connection ends kill worker and shutdown process + client.on('end', function() { + worker.destroy(); + }); + + worker.on('death', function() { + process.exit(0); + }); + + }); + + process.once('exit', function() { + forEach(checks, function(check, type) { + assert.ok(check.receive, 'The ' + type + ' did not receive any message'); + assert.ok(check.correct, + 'The ' + type + ' did not get the correct message'); + }); + }); +}