cluster: remove handles when disconnecting worker
authorBen Noordhuis <info@bnoordhuis.nl>
Tue, 3 Nov 2015 18:06:50 +0000 (19:06 +0100)
committerJames M Snell <jasnell@gmail.com>
Wed, 23 Dec 2015 16:38:33 +0000 (08:38 -0800)
Due to the race window between the master's "disconnect" message and the
worker's "handle received" message, connections sometimes got stuck in
the pending handles queue when calling `worker.disconnect()` in the
master process.

The observable effect from the client's perspective was a TCP or HTTP
connection that simply stalled.  This commit fixes that by closing open
handles in the master when the "disconnect" message is sent.

Fixes: https://github.com/nodejs/node/issues/3551
PR-URL: https://github.com/nodejs/node/pull/3677
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Fedor Indutny <fedor@indutny.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
lib/cluster.js
lib/internal/cluster.js [new file with mode: 0644]
node.gyp
test/parallel/test-cluster-disconnect-handles.js [new file with mode: 0644]

index bbb6188..c4c2c0d 100644 (file)
@@ -217,7 +217,7 @@ function masterInit() {
   // Keyed on address:port:etc. When a worker dies, we walk over the handles
   // and remove() the worker from each one. remove() may do a linear scan
   // itself so we might end up with an O(n*m) operation. Ergo, FIXME.
-  var handles = {};
+  const handles = require('internal/cluster').handles;
 
   var initialized = false;
   cluster.setupMaster = function(options) {
@@ -308,6 +308,26 @@ function masterInit() {
 
   var ids = 0;
 
+  function removeWorker(worker) {
+    assert(worker);
+
+    delete cluster.workers[worker.id];
+
+    if (Object.keys(cluster.workers).length === 0) {
+      assert(Object.keys(handles).length === 0, 'Resource leak detected.');
+      intercom.emit('disconnect');
+    }
+  }
+
+  function removeHandlesForWorker(worker) {
+    assert(worker);
+
+    for (var key in handles) {
+      var handle = handles[key];
+      if (handle.remove(worker)) delete handles[key];
+    }
+  }
+
   cluster.fork = function(env) {
     cluster.setupMaster();
     const id = ++ids;
@@ -319,26 +339,6 @@ function masterInit() {
 
     worker.on('message', this.emit.bind(this, 'message'));
 
-    function removeWorker(worker) {
-      assert(worker);
-
-      delete cluster.workers[worker.id];
-
-      if (Object.keys(cluster.workers).length === 0) {
-        assert(Object.keys(handles).length === 0, 'Resource leak detected.');
-        intercom.emit('disconnect');
-      }
-    }
-
-    function removeHandlesForWorker(worker) {
-      assert(worker);
-
-      for (var key in handles) {
-        var handle = handles[key];
-        if (handle.remove(worker)) delete handles[key];
-      }
-    }
-
     worker.process.once('exit', function(exitCode, signalCode) {
       /*
        * Remove the worker from the workers list only
@@ -404,6 +404,8 @@ function masterInit() {
   Worker.prototype.disconnect = function() {
     this.suicide = true;
     send(this, { act: 'disconnect' });
+    removeHandlesForWorker(this);
+    removeWorker(this);
   };
 
   Worker.prototype.destroy = function(signo) {
@@ -490,11 +492,12 @@ function masterInit() {
     cluster.emit('listening', worker, info);
   }
 
-  // Server in worker is closing, remove from list.
+  // Server in worker is closing, remove from list.  The handle may have been
+  // removed by a prior call to removeHandlesForWorker() so guard against that.
   function close(worker, message) {
     var key = message.key;
     var handle = handles[key];
-    if (handle.remove(worker)) delete handles[key];
+    if (handle && handle.remove(worker)) delete handles[key];
   }
 
   function send(worker, message, handle, cb) {
diff --git a/lib/internal/cluster.js b/lib/internal/cluster.js
new file mode 100644 (file)
index 0000000..8380ea7
--- /dev/null
@@ -0,0 +1,4 @@
+'use strict';
+
+// Used in tests.
+exports.handles = {};
index 386074b..a587edc 100644 (file)
--- a/node.gyp
+++ b/node.gyp
@@ -69,6 +69,7 @@
       'lib/vm.js',
       'lib/zlib.js',
       'lib/internal/child_process.js',
+      'lib/internal/cluster.js',
       'lib/internal/freelist.js',
       'lib/internal/module.js',
       'lib/internal/socket_list.js',
diff --git a/test/parallel/test-cluster-disconnect-handles.js b/test/parallel/test-cluster-disconnect-handles.js
new file mode 100644 (file)
index 0000000..5fa9884
--- /dev/null
@@ -0,0 +1,65 @@
+/* eslint-disable no-debugger */
+// Flags: --expose_internals
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+const cluster = require('cluster');
+const net = require('net');
+
+const Protocol = require('_debugger').Protocol;
+
+if (common.isWindows) {
+  console.log('1..0 # Skipped: SCHED_RR not reliable on Windows');
+  return;
+}
+
+cluster.schedulingPolicy = cluster.SCHED_RR;
+
+// Worker sends back a "I'm here" message, then immediately suspends
+// inside the debugger.  The master connects to the debug agent first,
+// connects to the TCP server second, then disconnects the worker and
+// unsuspends it again.  The ultimate goal of this tortured exercise
+// is to make sure the connection is still sitting in the master's
+// pending handle queue.
+if (cluster.isMaster) {
+  const handles = require('internal/cluster').handles;
+  // FIXME(bnoordhuis) lib/cluster.js scans the execArgv arguments for
+  // debugger flags and renumbers any port numbers it sees starting
+  // from the default port 5858.  Add a '.' that circumvents the
+  // scanner but is ignored by atoi(3).  Heinous hack.
+  cluster.setupMaster({ execArgv: [`--debug=${common.PORT}.`] });
+  const worker = cluster.fork();
+  worker.on('message', common.mustCall(message => {
+    assert.strictEqual(Array.isArray(message), true);
+    assert.strictEqual(message[0], 'listening');
+    const address = message[1];
+    const host = address.address;
+    const debugClient = net.connect({ host, port: common.PORT });
+    const protocol = new Protocol();
+    debugClient.setEncoding('utf8');
+    debugClient.on('data', data => protocol.execute(data));
+    debugClient.once('connect', common.mustCall(() => {
+      protocol.onResponse = common.mustCall(res => {
+        protocol.onResponse = () => {};
+        const conn = net.connect({ host, port: address.port });
+        conn.once('connect', common.mustCall(() => {
+          conn.destroy();
+          assert.notDeepStrictEqual(handles, {});
+          worker.disconnect();
+          assert.deepStrictEqual(handles, {});
+          const req = protocol.serialize({ command: 'continue' });
+          debugClient.write(req);
+        }));
+      });
+    }));
+  }));
+  process.on('exit', () => assert.deepStrictEqual(handles, {}));
+} else {
+  const server = net.createServer(socket => socket.pipe(socket));
+  server.listen(() => {
+    process.send(['listening', server.address()]);
+    debugger;
+  });
+  process.on('disconnect', process.exit);
+}