// Keyed on address:port:etc. When a worker dies, we walk over the handles
// and remove() the worker from each one. remove() may do a linear scan
// itself so we might end up with an O(n*m) operation. Ergo, FIXME.
- var handles = {};
+ const handles = require('internal/cluster').handles;
var initialized = false;
cluster.setupMaster = function(options) {
var ids = 0;
+ function removeWorker(worker) {
+ assert(worker);
+
+ delete cluster.workers[worker.id];
+
+ if (Object.keys(cluster.workers).length === 0) {
+ assert(Object.keys(handles).length === 0, 'Resource leak detected.');
+ intercom.emit('disconnect');
+ }
+ }
+
+ function removeHandlesForWorker(worker) {
+ assert(worker);
+
+ for (var key in handles) {
+ var handle = handles[key];
+ if (handle.remove(worker)) delete handles[key];
+ }
+ }
+
cluster.fork = function(env) {
cluster.setupMaster();
const id = ++ids;
worker.on('message', this.emit.bind(this, 'message'));
- function removeWorker(worker) {
- assert(worker);
-
- delete cluster.workers[worker.id];
-
- if (Object.keys(cluster.workers).length === 0) {
- assert(Object.keys(handles).length === 0, 'Resource leak detected.');
- intercom.emit('disconnect');
- }
- }
-
- function removeHandlesForWorker(worker) {
- assert(worker);
-
- for (var key in handles) {
- var handle = handles[key];
- if (handle.remove(worker)) delete handles[key];
- }
- }
-
worker.process.once('exit', function(exitCode, signalCode) {
/*
* Remove the worker from the workers list only
Worker.prototype.disconnect = function() {
this.suicide = true;
send(this, { act: 'disconnect' });
+ removeHandlesForWorker(this);
+ removeWorker(this);
};
Worker.prototype.destroy = function(signo) {
cluster.emit('listening', worker, info);
}
- // Server in worker is closing, remove from list.
+ // Server in worker is closing, remove from list. The handle may have been
+ // removed by a prior call to removeHandlesForWorker() so guard against that.
function close(worker, message) {
var key = message.key;
var handle = handles[key];
- if (handle.remove(worker)) delete handles[key];
+ if (handle && handle.remove(worker)) delete handles[key];
}
function send(worker, message, handle, cb) {
--- /dev/null
+'use strict';
+
+// Used in tests.
+exports.handles = {};
'lib/vm.js',
'lib/zlib.js',
'lib/internal/child_process.js',
+ 'lib/internal/cluster.js',
'lib/internal/freelist.js',
'lib/internal/module.js',
'lib/internal/socket_list.js',
--- /dev/null
+/* eslint-disable no-debugger */
+// Flags: --expose_internals
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+const cluster = require('cluster');
+const net = require('net');
+
+const Protocol = require('_debugger').Protocol;
+
+if (common.isWindows) {
+ console.log('1..0 # Skipped: SCHED_RR not reliable on Windows');
+ return;
+}
+
+cluster.schedulingPolicy = cluster.SCHED_RR;
+
+// Worker sends back a "I'm here" message, then immediately suspends
+// inside the debugger. The master connects to the debug agent first,
+// connects to the TCP server second, then disconnects the worker and
+// unsuspends it again. The ultimate goal of this tortured exercise
+// is to make sure the connection is still sitting in the master's
+// pending handle queue.
+if (cluster.isMaster) {
+ const handles = require('internal/cluster').handles;
+ // FIXME(bnoordhuis) lib/cluster.js scans the execArgv arguments for
+ // debugger flags and renumbers any port numbers it sees starting
+ // from the default port 5858. Add a '.' that circumvents the
+ // scanner but is ignored by atoi(3). Heinous hack.
+ cluster.setupMaster({ execArgv: [`--debug=${common.PORT}.`] });
+ const worker = cluster.fork();
+ worker.on('message', common.mustCall(message => {
+ assert.strictEqual(Array.isArray(message), true);
+ assert.strictEqual(message[0], 'listening');
+ const address = message[1];
+ const host = address.address;
+ const debugClient = net.connect({ host, port: common.PORT });
+ const protocol = new Protocol();
+ debugClient.setEncoding('utf8');
+ debugClient.on('data', data => protocol.execute(data));
+ debugClient.once('connect', common.mustCall(() => {
+ protocol.onResponse = common.mustCall(res => {
+ protocol.onResponse = () => {};
+ const conn = net.connect({ host, port: address.port });
+ conn.once('connect', common.mustCall(() => {
+ conn.destroy();
+ assert.notDeepStrictEqual(handles, {});
+ worker.disconnect();
+ assert.deepStrictEqual(handles, {});
+ const req = protocol.serialize({ command: 'continue' });
+ debugClient.write(req);
+ }));
+ });
+ }));
+ }));
+ process.on('exit', () => assert.deepStrictEqual(handles, {}));
+} else {
+ const server = net.createServer(socket => socket.pipe(socket));
+ server.listen(() => {
+ process.send(['listening', server.address()]);
+ debugger;
+ });
+ process.on('disconnect', process.exit);
+}