cluster: add graceful disconnect support
authorAndreas Madsen <amwebdk@gmail.com>
Sat, 10 Mar 2012 15:30:06 +0000 (16:30 +0100)
committerisaacs <i@izs.me>
Mon, 19 Mar 2012 20:29:01 +0000 (13:29 -0700)
This patch add a worker.disconnect() method there will stop the worker from accepting
new connections and then stop the IPC. This allow the worker to die graceful.
When the IPC has been disconnected a 'disconnect' event will emit.

The patch also add a cluster.disconnect() method, this will call worker.disconnect() on
all connected workers. When the workers are disconneted it will then close all server
handlers. This allow the cluster itself to self terminate in a graceful way.

doc/api/cluster.markdown
lib/cluster.js
test/simple/test-cluster-disconnect.js [new file with mode: 0644]
test/simple/test-cluster-worker-disconnect.js [new file with mode: 0644]

index c87217f..b7cfee4 100644 (file)
@@ -118,6 +118,21 @@ where the 'listening' event is emitted.
       console.log("We are now connected");
     });
 
+## Event: 'disconnect'
+
+* `worker` {Worker object}
+
+When a workers IPC channel has disconnected this event is emitted. This will happen
+when the worker die, usually after calling `.destroy()`.
+
+But also when calling `.disconnect()`, in this case it is possible there is delay
+between the `disconnect` and `death` and the event can be used to detect if the
+process is stuck in a cleanup or if there are long living connection.
+
+    cluster.on('disconnect', function(worker) {
+      console.log('The worker #' + worker.uniqueID + ' has disconnected');
+    });
+
 ## Event: 'death'
 
 * `worker` {Worker object}
@@ -179,6 +194,16 @@ Spawn a new worker process. This can only be called from the master process.
 All settings set by the `.setupMaster` is stored in this settings object.
 This object is not supposed to be change or set manually.
 
+## cluster.disconnect([callback])
+
+* `callback` {Function} called when all workers are disconnected and handlers are closed
+
+When calling this method all workers will commit a graceful suicide. When they are
+disconnected all internal handlers will be closed, allowing the master process to
+die graceful if no other event is waiting.
+
+The method takes an optional callback argument there will be called when finished.
+
 ## cluster.workers
 
 * {Object}
@@ -232,9 +257,8 @@ See: [Child Process module](child_process.html)
 
 * {Boolean}
 
-This property is a boolean. It is set when a worker dies, until then it is
-`undefined`.  It is true if the worker was killed using the `.destroy()`
-method, and false otherwise.
+This property is a boolean. It is set when a worker dies after calling `.destroy()`
+or immediately after calling the `.disconnect()` method. Until then it is `undefined`.
 
 ### worker.send(message, [sendHandle])
 
@@ -273,6 +297,55 @@ a suicide boolean is set to true.
     // destroy worker
     worker.destroy();
 
+
+## Worker.disconnect()
+
+When calling this function the worker will no longer accept new connections, but
+they will be handled by any other listening worker. Existing connection will be
+allowed to exit as usual. When no more connections exist, the IPC channel to the worker
+will close allowing it to die graceful. When the IPC channel is closed the `disconnect`
+event will emit, this is then followed by the `death` event, there is emitted when
+the worker finally die.
+
+Because there might be long living connections, it is useful to implement a timeout.
+This example ask the worker to disconnect and after 2 seconds it will destroy the
+server. An alternative wound be to execute `worker.destroy()` after 2 seconds, but
+that would normally not allow the worker to do any cleanup if needed.
+
+    if (cluster.isMaster) {
+      var worker = cluser.fork();
+      var timeout;
+
+      worker.on('listening', function () {
+        worker.disconnect();
+        timeout = setTimeout(function () {
+          worker.send('force kill');
+        }, 2000);
+      });
+
+      worker.on('disconnect', function () {
+        clearTimeout(timeout);
+      });
+
+    } else if (cluster.isWorker) {
+      var net = require('net');
+      var server = net.createServer(function (socket) {
+        // connection never end
+      });
+
+      server.listen(8000);
+
+      server.on('close', function () {
+        // cleanup
+      });
+
+      process.on('message', function (msg) {
+        if (msg === 'force kill') {
+          server.destroy();
+        }
+      });
+    }
+
 ### Event: 'message'
 
 * `message` {Object}
@@ -342,6 +415,17 @@ on the specified worker.
       // Worker is listening
     };
 
+## Event: 'disconnect'
+
+* `worker` {Worker object}
+
+Same as the `cluster.on('disconnect')` event, but emits only when the state change
+on the specified worker.
+
+    cluster.fork().on('disconnect', function (worker) {
+      // Worker has disconnected
+    };
+
 ## Event: 'death'
 
 * `worker` {Worker object}
index cd90219..977f1dd 100644 (file)
@@ -77,6 +77,19 @@ function eachWorker(cb) {
   }
 }
 
+// Extremely simple progress tracker
+function ProgressTracker(missing, callback) {
+  this.missing = missing;
+  this.callback = callback;
+}
+ProgressTracker.prototype.done = function() {
+  this.missing -= 1;
+  this.check();
+};
+ProgressTracker.prototype.check = function() {
+  if (this.missing === 0) this.callback();
+};
+
 cluster.setupMaster = function(options) {
   // This can only be called from the master.
   assert(cluster.isMaster);
@@ -239,7 +252,10 @@ if (cluster.isMaster) {
 // Messages to a worker will be handled using this methods
 else if (cluster.isWorker) {
 
-  // TODO: the disconnect step will use this
+  // Handle worker.disconnect from master
+  messageHandingObject.disconnect = function(message, worker) {
+    worker.disconnect();
+  };
 }
 
 function toDecInt(value) {
@@ -293,9 +309,11 @@ function Worker(customEnv) {
     });
   }
 
-  // handle internalMessage and exit event
+  // handle internalMessage, exit and disconnect event
   this.process.on('internalMessage', handleMessage.bind(null, this));
   this.process.on('exit', prepareDeath.bind(null, this, 'dead', 'death'));
+  this.process.on('disconnect',
+                  prepareDeath.bind(null, this, 'disconnected', 'disconnect'));
 
   // relay message and error
   this.process.on('message', this.emit.bind(this, 'message'));
@@ -356,14 +374,6 @@ Worker.prototype.send = function() {
   this.process.send.apply(this.process, arguments);
 };
 
-
-function closeWorkerChannel(worker, callback) {
-  //Apparently the .close method is async, but do not have a callback
-  worker.process._channel.close();
-  worker.process._channel = null;
-  process.nextTick(callback);
-}
-
 // Kill the worker without restarting
 Worker.prototype.destroy = function() {
   var self = this;
@@ -373,9 +383,14 @@ Worker.prototype.destroy = function() {
   if (cluster.isMaster) {
     // Disconnect IPC channel
     // this way the worker won't need to propagate suicide state to master
-    closeWorkerChannel(this, function() {
+    if (self.process.connected) {
+      self.process.once('disconnect', function() {
+        self.process.kill();
+      });
+      self.process.disconnect();
+    } else {
       self.process.kill();
-    });
+    }
 
   } else {
     // Channel is open
@@ -403,6 +418,59 @@ Worker.prototype.destroy = function() {
   }
 };
 
+// The .disconnect function will close all server and then disconnect
+// the IPC channel.
+if (cluster.isMaster) {
+  // Used in master
+  Worker.prototype.disconnect = function() {
+    this.suicide = true;
+
+    sendInternalMessage(this, {cmd: 'disconnect'});
+  };
+
+} else {
+  // Used in workers
+  Worker.prototype.disconnect = function() {
+    var self = this;
+
+    this.suicide = true;
+
+    // keep track of open servers
+    var servers = Object.keys(serverLisenters).length;
+    var progress = new ProgressTracker(servers, function() {
+      // there are no more servers open so we will close the IPC channel.
+      // Closeing the IPC channel will emit emit a disconnect event
+      // in both master and worker on the process object.
+      // This event will be handled by prepearDeath.
+      self.process.disconnect();
+    });
+
+    // depending on where this function was called from (master or worker)
+    // the suicide state has allready been set.
+    // But it dosn't really matter if we set it again.
+    sendInternalMessage(this, {cmd: 'suicide'}, function() {
+      // in case there are no servers
+      progress.check();
+
+      // closeing all servers graceful
+      var server;
+      for (var key in serverLisenters) {
+        server = serverLisenters[key];
+
+        // in case the server is closed we wont close it again
+        if (server._handle === null) {
+          progress.done();
+          continue;
+        }
+
+        server.on('close', progress.done.bind(progress));
+        server.close();
+      }
+    });
+
+  };
+}
+
 // Fork a new worker
 cluster.fork = function(env) {
   // This can only be called from the master.
@@ -414,6 +482,33 @@ cluster.fork = function(env) {
   return (new cluster.Worker(env));
 };
 
+// execute .disconnect on all workers and close handlers when done
+cluster.disconnect = function(callback) {
+  // This can only be called from the master.
+  assert(cluster.isMaster);
+
+  // Close all TCP handlers when all workers are disconnected
+  var workers = Object.keys(cluster.workers).length;
+  var progress = new ProgressTracker(workers, function() {
+    for (var key in serverHandlers) {
+      serverHandlers[key].close();
+      delete serverHandlers[key];
+    }
+
+    // call callback when done
+    if (callback) callback();
+  });
+
+  // begin disconnecting all workers
+  eachWorker(function(worker) {
+    worker.once('disconnect', progress.done.bind(progress));
+    worker.disconnect();
+  });
+
+  // in case there wasn't any workers
+  progress.check();
+};
+
 // Sync way to quickly kill all cluster workers
 // However the workers may not die instantly
 function quickDestroyCluster() {
diff --git a/test/simple/test-cluster-disconnect.js b/test/simple/test-cluster-disconnect.js
new file mode 100644 (file)
index 0000000..4ea6afc
--- /dev/null
@@ -0,0 +1,122 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+var common = require('../common');
+var assert = require('assert');
+var cluster = require('cluster');
+var net = require('net');
+
+if (cluster.isWorker) {
+  net.createServer(function(socket) {
+    socket.end('echo');
+  }).listen(common.PORT, '127.0.0.1');
+
+  net.createServer(function(socket) {
+    socket.end('echo');
+  }).listen(common.PORT + 1, '127.0.0.1');
+
+} else if (cluster.isMaster) {
+
+  // test a single TCP server
+  var testConnection = function(port, cb) {
+    var socket = net.connect(port, '127.0.0.1', function() {
+      // buffer result
+      var result = '';
+      socket.on('data', function(chunk) { result += chunk; });
+
+      // check result
+      socket.on('end', function() {
+        cb(result === 'echo');
+      });
+    });
+  };
+
+  // test both servers created in the cluster
+  var testCluster = function(cb) {
+    var servers = 2;
+    var done = 0;
+
+    for (var i = 0, l = servers; i < l; i++) {
+      testConnection(common.PORT + i, function(sucess) {
+        assert.ok(sucess);
+        done += 1;
+        if (done === servers) {
+          cb();
+        }
+      });
+    }
+  };
+
+  // start two workers and execute callback when both is listening
+  var startCluster = function(cb) {
+    var workers = 2;
+    var online = 0;
+
+    for (var i = 0, l = workers; i < l; i++) {
+
+      var worker = cluster.fork();
+      worker.on('listening', function() {
+        online += 1;
+        if (online === workers) {
+          cb();
+        }
+      });
+    }
+  };
+
+
+  var results = {
+    start: 0,
+    test: 0,
+    disconnect: 0
+  };
+
+  var test = function(again) {
+    //1. start cluster
+    startCluster(function() {
+      results.start += 1;
+
+      //2. test cluster
+      testCluster(function() {
+        results.test += 1;
+
+        //3. disconnect cluster
+        cluster.disconnect(function() {
+          results.disconnect += 1;
+
+          // run test again to confirm cleanup
+          if (again) {
+            test();
+          }
+        });
+      });
+    });
+  };
+
+  test(true);
+
+  process.once('exit', function() {
+    assert.equal(results.start, 2);
+    assert.equal(results.test, 2);
+    assert.equal(results.disconnect, 2);
+  });
+}
diff --git a/test/simple/test-cluster-worker-disconnect.js b/test/simple/test-cluster-worker-disconnect.js
new file mode 100644 (file)
index 0000000..af2bf70
--- /dev/null
@@ -0,0 +1,110 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+var common = require('../common');
+var assert = require('assert');
+var cluster = require('cluster');
+
+if (cluster.isWorker) {
+  var http = require('http');
+  http.Server(function() {
+
+  }).listen(common.PORT, '127.0.0.1');
+
+} else if (cluster.isMaster) {
+
+  var checks = {
+    cluster: {
+      emitDisconnect: false,
+      emitDeath: false,
+      callback: false
+    },
+    worker: {
+      emitDisconnect: false,
+      emitDeath: false,
+      state: false,
+      suicideMode: false,
+      died: false
+    }
+  };
+
+  // helper function to check if a process is alive
+  var alive = function(pid) {
+    try {
+      process.kill(pid, 0);
+      return true;
+    } catch (e) {
+      return false;
+    }
+  };
+
+  // start worker
+  var worker = cluster.fork();
+
+  // Disconnect worker when it is ready
+  worker.once('listening', function() {
+    worker.disconnect();
+  });
+
+  // Check cluster events
+  cluster.once('disconnect', function() {
+    checks.cluster.emitDisconnect = true;
+  });
+  cluster.once('death', function() {
+    checks.cluster.emitDeath = true;
+  });
+
+  // Check worker eventes and properties
+  worker.once('disconnect', function() {
+    checks.worker.emitDisconnect = true;
+    checks.worker.suicideMode = worker.suicide;
+    checks.worker.state = worker.state;
+  });
+
+  // Check that the worker died
+  worker.once('death', function() {
+    checks.worker.emitDeath = true;
+    checks.worker.died = !alive(worker.process.pid);
+    process.nextTick(function() {
+      process.exit(0);
+    });
+  });
+
+  process.once('exit', function() {
+
+    var w = checks.worker;
+    var c = checks.cluster;
+
+    // events
+    assert.ok(w.emitDisconnect, 'Disconnect event did not emit');
+    assert.ok(c.emitDisconnect, 'Disconnect event did not emit');
+    assert.ok(w.emitDeath, 'Death event did not emit');
+    assert.ok(c.emitDeath, 'Death event did not emit');
+
+    // flags
+    assert.equal(w.state, 'disconnected', 'The state property was not set');
+    assert.equal(w.suicideMode, true, 'Suicide mode was not set');
+
+    // is process alive
+    assert.ok(w.died, 'The worker did not die');
+  });
+}