child_process: do not keep list of sent sockets
authorFedor Indutny <fedor.indutny@gmail.com>
Mon, 14 Jan 2013 17:08:20 +0000 (21:08 +0400)
committerisaacs <i@izs.me>
Thu, 17 Jan 2013 21:46:31 +0000 (13:46 -0800)
Keeping list of all sockets that were sent to child process causes memory
leak and thus unacceptable (see #4587). However `server.close()` should
still work properly.

This commit introduces two options:

* child.send(socket, { track: true }) - will send socket and track its status.
  You should use it when you want `server.connections` to be a reliable
  number, and receive `close` event on sent sockets.
* child.send(socket) - will send socket without tracking it status. This
  performs much better, because of smaller number of RTT between master and
  child.

With both of these options `server.close()` will wait for all sent
sockets to get closed.

doc/api/child_process.markdown
doc/api/net.markdown
lib/child_process.js
lib/net.js
test/simple/test-child-process-fork-getconnections.js [new file with mode: 0644]
test/simple/test-child-process-fork-net2.js
test/simple/test-child-process-fork-track.js [new file with mode: 0644]

index d803cff..343030d 100644 (file)
@@ -124,10 +124,11 @@ process may not actually kill it.  `kill` really just sends a signal to a proces
 
 See `kill(2)`
 
-### child.send(message, [sendHandle])
+### child.send(message, [sendHandle], [options])
 
 * `message` {Object}
 * `sendHandle` {Handle object}
+* `options` {Object}
 
 When using `child_process.fork()` you can write to the child using
 `child.send(message, [sendHandle])` and messages are received by
@@ -166,6 +167,11 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
 socket object to another process. The child will receive the object as its
 second argument to the `message` event.
 
+The `options` object may have the following properties:
+
+ * `track` - Notify master process when `sendHandle` will be closed in child
+   process. (`false` by default)
+
 **send server object**
 
 Here is an example of sending a server:
index 2b54dbe..804935e 100644 (file)
@@ -231,10 +231,19 @@ with `child_process.fork()`.
 
 The number of concurrent connections on the server.
 
-This becomes `null` when sending a socket to a child with `child_process.fork()`.
+This becomes `null` when sending a socket to a child with
+`child_process.fork()`. To poll forks and get current number of active
+connections use asynchronous `server.getConnections` instead.
 
 `net.Server` is an [EventEmitter][] with the following events:
 
+### server.getConnections(callback)
+
+Asynchronously get the number of concurrent connections on the server. Works
+when sockets were sent to forks.
+
+Callback should take two arguments `err` and `count`.
+
 ### Event: 'listening'
 
 Emitted when the server has been bound after calling `server.listen`.
index f69c47d..1130af7 100644 (file)
@@ -107,29 +107,31 @@ var handleConversion = {
   },
 
   'net.Socket': {
-    send: function(message, socket) {
-      // pause socket so no data is lost, will be resumed later
-
-      // if the socket wsa created by net.Server
+    send: function(message, socket, options) {
+      // if the socket was created by net.Server
       if (socket.server) {
         // the slave should keep track of the socket
         message.key = socket.server._connectionKey;
 
         var firstTime = !this._channel.sockets.send[message.key];
-
-        // add socket to connections list
         var socketList = getSocketList('send', this, message.key);
-        socketList.add(socket);
 
-        // the server should no longer expose a .connection property
-        // and when asked to close it should query the socket status from slaves
-        if (firstTime) {
-          socket.server._setupSlave(socketList);
+        if (options && options.track) {
+          // Keep track of socket's status
+          message.id = socketList.add(socket);
+        } else {
+          // the server should no longer expose a .connection property
+          // and when asked to close it should query the socket status from
+          // the slaves
+          if (firstTime) socket.server._setupSlave(socketList);
+
+          // Act like socket is detached
+          socket.server._connections--;
         }
       }
 
       // remove handle from socket object, it will be closed when the socket
-      // has been send
+      // will be sent
       var handle = socket._handle;
       handle.onread = function() {};
       socket._handle = null;
@@ -137,6 +139,11 @@ var handleConversion = {
       return handle;
     },
 
+    postSend: function(handle) {
+      // Close the Socket handle after sending it
+      handle.close();
+    },
+
     got: function(message, handle, emit) {
       var socket = new net.Socket({handle: handle});
       socket.readable = socket.writable = true;
@@ -146,7 +153,10 @@ var handleConversion = {
 
         // add socket to connections list
         var socketList = getSocketList('got', this, message.key);
-        socketList.add(socket);
+        socketList.add({
+          id: message.id,
+          socket: socket
+        });
       }
 
       emit(socket);
@@ -161,39 +171,98 @@ function SocketListSend(slave, key) {
   var self = this;
 
   this.key = key;
-  this.list = [];
   this.slave = slave;
 
+  // These two arrays are used to store the list of sockets and the freelist of
+  // indexes in this list. After insertion, item will have persistent index
+  // until it'll be removed. This way we can use this index as an identifier for
+  // sockets.
+  this.list = [];
+  this.freelist = [];
+
   slave.once('disconnect', function() {
     self.flush();
   });
 
   this.slave.on('internalMessage', function(msg) {
     if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return;
-    self.flush();
+    self.remove(msg.id);
   });
 }
 util.inherits(SocketListSend, EventEmitter);
 
 SocketListSend.prototype.add = function(socket) {
-  this.list.push(socket);
+  var index;
+
+  // Pick one of free indexes, or insert in the end of the list
+  if (this.freelist.length > 0) {
+    index = this.freelist.pop();
+    this.list[index] = socket;
+  } else {
+    index = this.list.push(socket) - 1;
+  }
+
+  return index;
+};
+
+SocketListSend.prototype.remove = function(index) {
+  var socket = this.list[index];
+  if (!socket) return;
+
+  // Create a hole in the list and move index to the freelist
+  this.list[index] = null;
+  this.freelist.push(index);
+
+  socket.destroy();
 };
 
 SocketListSend.prototype.flush = function() {
   var list = this.list;
   this.list = [];
+  this.freelist = [];
 
   list.forEach(function(socket) {
-    socket.destroy();
+    if (socket) socket.destroy();
   });
 };
 
-SocketListSend.prototype.update = function() {
-  if (this.slave.connected === false) return;
+SocketListSend.prototype._request = function request(msg, cmd, callback) {
+  var self = this;
+
+  if (!this.slave.connected) return onslaveclose();
+  this.slave.send(msg);
+
+  function onclose() {
+    self.slave.removeListener('internalMessage', onreply);
+    callback(new Error('Slave closed before reply'));
+  };
+
+  function onreply(msg) {
+    if (msg.cmd !== cmd || msg.key !== self.key) return;
+    self.slave.removeListener('disconnect', onclose);
+    self.slave.removeListener('internalMessage', onreply);
+
+    callback(null, msg);
+  };
+
+  this.slave.once('disconnect', onclose);
+  this.slave.on('internalMessage', onreply);
+};
+
+SocketListSend.prototype.close = function close(callback) {
+  this._request({
+    cmd: 'NODE_SOCKET_NOTIFY_CLOSE',
+    key: this.key
+  }, 'NODE_SOCKET_ALL_CLOSED', callback);
+};
 
-  this.slave.send({
-    cmd: 'NODE_SOCKET_FETCH',
+SocketListSend.prototype.getConnections = function getConnections(callback) {
+  this._request({
+    cmd: 'NODE_SOCKET_GET_COUNT',
     key: this.key
+  }, 'NODE_SOCKET_COUNT', function(err, msg) {
+    if (err) return callback(err);
+    callback(null, msg.count);
   });
 };
 
@@ -203,45 +272,59 @@ function SocketListReceive(slave, key) {
 
   var self = this;
 
+  this.connections = 0;
   this.key = key;
-  this.list = [];
   this.slave = slave;
 
-  slave.on('internalMessage', function(msg) {
-    if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return;
-
-    if (self.list.length === 0) {
-      self.flush();
-      return;
-    }
+  function onempty() {
+    if (!self.slave.connected) return;
 
-    self.on('itemRemoved', function removeMe() {
-      if (self.list.length !== 0) return;
-      self.removeListener('itemRemoved', removeMe);
-      self.flush();
+    self.slave.send({
+      cmd: 'NODE_SOCKET_ALL_CLOSED',
+      key: self.key
     });
+  }
+
+  this.slave.on('internalMessage', function(msg) {
+    if (msg.key !== self.key) return;
+
+    if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') {
+      // Already empty
+      if (self.connections === 0) return onempty();
+
+      // Wait for sockets to get closed
+      self.once('empty', onempty);
+    } else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') {
+      if (!self.slave.connected) return;
+      self.slave.send({
+        cmd: 'NODE_SOCKET_COUNT',
+        key: self.key,
+        count: self.connections
+      });
+    }
   });
 }
 util.inherits(SocketListReceive, EventEmitter);
 
-SocketListReceive.prototype.flush = function() {
-  this.list = [];
+SocketListReceive.prototype.add = function(obj) {
+  var self = this;
 
-  if (this.slave.connected) {
-    this.slave.send({
-      cmd: 'NODE_SOCKET_CLOSED',
-      key: this.key
-    });
-  }
-};
+  this.connections++;
 
-SocketListReceive.prototype.add = function(socket) {
-  var self = this;
-  this.list.push(socket);
+  // Notify previous owner of socket about its state change
+  obj.socket.once('close', function() {
+    self.connections--;
 
-  socket.on('close', function() {
-    self.list.splice(self.list.indexOf(socket), 1);
-    self.emit('itemRemoved');
+    if (obj.id !== undefined && self.slave.connected) {
+      // Master wants to keep eye on socket status
+      self.slave.send({
+        cmd: 'NODE_SOCKET_CLOSED',
+        key: self.key,
+        id: obj.id
+      });
+    }
+
+    if (self.connections === 0) self.emit('empty');
   });
 };
 
@@ -366,17 +449,16 @@ function setupChannel(target, channel) {
     var string = JSON.stringify(message) + '\n';
     var writeReq = channel.writeUtf8String(string, handle);
 
-    // Close the Socket handle after sending it
-    if (message && message.type === 'net.Socket') {
-      handle.close();
-    }
-
     if (!writeReq) {
       var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
       this.emit('error', er);
     }
 
-    writeReq.oncomplete = nop;
+    if (obj && obj.postSend) {
+      writeReq.oncomplete = obj.postSend.bind(null, handle);
+    } else {
+      writeReq.oncomplete = nop;
+    }
 
     /* If the master is > 2 read() calls behind, please stop sending. */
     return channel.writeQueueSize < (65536 * 2);
@@ -656,6 +738,7 @@ function ChildProcess() {
 
   this._closesNeeded = 1;
   this._closesGot = 0;
+  this.connected = false;
 
   this.signalCode = null;
   this.exitCode = null;
index 41e2819..4c79161 100644 (file)
@@ -874,8 +874,6 @@ function Server(/* [ options, ] listener */) {
 
   this._connections = 0;
 
-  // when server is using slaves .connections is not reliable
-  // so null will be return if thats the case
   Object.defineProperty(this, 'connections', {
     get: function() {
       if (self._usingSlaves) {
@@ -890,6 +888,8 @@ function Server(/* [ options, ] listener */) {
   });
 
   this._handle = null;
+  this._usingSlaves = false;
+  this._slaves = [];
 
   this.allowHalfOpen = options.allowHalfOpen || false;
 }
@@ -1122,7 +1122,37 @@ function onconnection(clientHandle) {
 }
 
 
+Server.prototype.getConnections = function(cb) {
+  if (!this._usingSlaves) return cb(null, this.connections);
+
+  // Poll slaves
+  var left = this._slaves.length,
+      total = this._connections;
+
+  function oncount(err, count) {
+    if (err) {
+      left = -1;
+      return cb(err);
+    }
+
+    total += count;
+    if (--left === 0) return cb(null, total);
+  }
+
+  this._slaves.forEach(function(slave) {
+    slave.getConnections(oncount);
+  });
+};
+
+
 Server.prototype.close = function(cb) {
+  function onSlaveClose() {
+    if (--left !== 0) return;
+
+    self._connections = 0;
+    self._emitCloseIfDrained();
+  }
+
   if (!this._handle) {
     // Throw error. Follows net_legacy behaviour.
     throw new Error('Not running');
@@ -1133,14 +1163,21 @@ Server.prototype.close = function(cb) {
   }
   this._handle.close();
   this._handle = null;
-  this._emitCloseIfDrained();
 
-  // fetch new socket lists
   if (this._usingSlaves) {
-    this._slaves.forEach(function(socketList) {
-      if (socketList.list.length === 0) return;
-      socketList.update();
+    var self = this,
+        left = this._slaves.length;
+
+    // Increment connections to be sure that, even if all sockets will be closed
+    // during polling of slaves, `close` event will be emitted only once.
+    this._connections++;
+
+    // Poll slaves
+    this._slaves.forEach(function(slave) {
+      slave.close(onSlaveClose);
     });
+  } else {
+    this._emitCloseIfDrained();
   }
 
   return this;
@@ -1167,12 +1204,8 @@ Server.prototype.listenFD = util.deprecate(function(fd, type) {
   return this.listen({ fd: fd });
 }, 'listenFD is deprecated. Use listen({fd: <number>}).');
 
-// when sending a socket using fork IPC this function is executed
 Server.prototype._setupSlave = function(socketList) {
-  if (!this._usingSlaves) {
-    this._usingSlaves = true;
-    this._slaves = [];
-  }
+  this._usingSlaves = true;
   this._slaves.push(socketList);
 };
 
diff --git a/test/simple/test-child-process-fork-getconnections.js b/test/simple/test-child-process-fork-getconnections.js
new file mode 100644 (file)
index 0000000..476dbc3
--- /dev/null
@@ -0,0 +1,107 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var assert = require('assert');
+var common = require('../common');
+var fork = require('child_process').fork;
+var net = require('net');
+var count = 12;
+
+if (process.argv[2] === 'child') {
+  var sockets = [];
+  var id = process.argv[3];
+
+  process.on('message', function(m, socket) {
+    if (socket) {
+      sockets.push(socket);
+    }
+
+    if (m.cmd === 'close') {
+      sockets[m.id].once('close', function() {
+        process.send({ id: m.id, status: 'closed' });
+      });
+      sockets[m.id].destroy();
+    }
+  });
+} else {
+  var child = fork(process.argv[1], ['child']);
+
+  var server = net.createServer();
+  var sockets = [];
+  var sent = 0;
+
+  server.on('connection', function(socket) {
+    child.send({ cmd: 'new' }, socket, { track: false });
+    sockets.push(socket);
+
+    if (sockets.length === count) {
+      closeSockets();
+      server.close();
+    }
+  });
+
+  var disconnected = 0;
+  server.on('listening', function() {
+
+    var j = count, client;
+    while (j--) {
+      client = net.connect(common.PORT, '127.0.0.1');
+      client.on('close', function() {
+        console.error('[m] CLIENT: close event');
+        disconnected += 1;
+      });
+      // XXX This resume() should be unnecessary.
+      // a stream high water mark should be enough to keep
+      // consuming the input.
+      client.resume();
+    }
+  });
+
+  function closeSockets(i) {
+    if (!i) i = 0;
+    if (i === count) return;
+
+    sent++;
+    child.send({ id: i, cmd: 'close' });
+    child.once('message', function(m) {
+      assert(m.status === 'closed');
+      server.getConnections(function(err, num) {
+        closeSockets(i + 1);
+      });
+    });
+  };
+
+  var closeEmitted = false;
+  server.on('close', function() {
+    console.error('[m] server close');
+    closeEmitted = true;
+
+    child.kill();
+  });
+
+  server.listen(common.PORT, '127.0.0.1');
+
+  process.on('exit', function() {
+    assert.equal(sent, count);
+    assert.equal(disconnected, count);
+    assert.ok(closeEmitted);
+  });
+}
index be749fe..a8e8039 100644 (file)
@@ -26,13 +26,13 @@ var net = require('net');
 var count = 12;
 
 if (process.argv[2] === 'child') {
-
   var needEnd = [];
+  var id = process.argv[3];
 
   process.on('message', function(m, socket) {
     if (!socket) return;
 
-    console.error('got socket', m);
+    console.error('[%d] got socket', id, m);
 
     // will call .end('end') or .write('write');
     socket[m](m);
@@ -40,11 +40,11 @@ if (process.argv[2] === 'child') {
     socket.resume();
 
     socket.on('data', function() {
-      console.error('%d socket.data', process.pid, m);
+      console.error('[%d] socket.data', id, m);
     });
 
     socket.on('end', function() {
-      console.error('%d socket.end', process.pid, m);
+      console.error('[%d] socket.end', id, m);
     });
 
     // store the unfinished socket
@@ -53,58 +53,62 @@ if (process.argv[2] === 'child') {
     }
 
     socket.on('close', function() {
-      console.error('%d socket.close', process.pid, m);
+      console.error('[%d] socket.close', id, m);
     });
 
     socket.on('finish', function() {
-      console.error('%d socket finished', process.pid, m);
+      console.error('[%d] socket finished', id, m);
     });
   });
 
   process.on('message', function(m) {
     if (m !== 'close') return;
-    console.error('got close message');
+    console.error('[%d] got close message', id);
     needEnd.forEach(function(endMe, i) {
-      console.error('%d ending %d', process.pid, i);
+      console.error('[%d] ending %d', id, i);
       endMe.end('end');
     });
   });
 
   process.on('disconnect', function() {
-    console.error('%d process disconnect, ending', process.pid);
+    console.error('[%d] process disconnect, ending', id);
     needEnd.forEach(function(endMe, i) {
-      console.error('%d ending %d', process.pid, i);
+      console.error('[%d] ending %d', id, i);
       endMe.end('end');
     });
-    endMe = null;
   });
 
 } else {
 
-  var child1 = fork(process.argv[1], ['child']);
-  var child2 = fork(process.argv[1], ['child']);
-  var child3 = fork(process.argv[1], ['child']);
+  var child1 = fork(process.argv[1], ['child', '1']);
+  var child2 = fork(process.argv[1], ['child', '2']);
+  var child3 = fork(process.argv[1], ['child', '3']);
 
   var server = net.createServer();
 
-  var connected = 0;
+  var connected = 0,
+      closed = 0;
   server.on('connection', function(socket) {
     switch (connected % 6) {
       case 0:
-        child1.send('end', socket); break;
+        child1.send('end', socket, { track: false }); break;
       case 1:
-        child1.send('write', socket); break;
+        child1.send('write', socket, { track: true }); break;
       case 2:
-        child2.send('end', socket); break;
+        child2.send('end', socket, { track: true }); break;
       case 3:
-        child2.send('write', socket); break;
+        child2.send('write', socket, { track: false }); break;
       case 4:
-        child3.send('end', socket); break;
+        child3.send('end', socket, { track: false }); break;
       case 5:
-        child3.send('write', socket); break;
+        child3.send('write', socket, { track: false }); break;
     }
     connected += 1;
 
+    socket.once('close', function() {
+      console.log('[m] socket closed, total %d', ++closed);
+    });
+
     if (connected === count) {
       closeServer();
     }
@@ -117,7 +121,7 @@ if (process.argv[2] === 'child') {
     while (j--) {
       client = net.connect(common.PORT, '127.0.0.1');
       client.on('close', function() {
-        console.error('CLIENT: close event in master');
+        console.error('[m] CLIENT: close event');
         disconnected += 1;
       });
       // XXX This resume() should be unnecessary.
@@ -129,7 +133,7 @@ if (process.argv[2] === 'child') {
 
   var closeEmitted = false;
   server.on('close', function() {
-    console.error('server close');
+    console.error('[m] server close');
     closeEmitted = true;
 
     child1.kill();
@@ -141,18 +145,19 @@ if (process.argv[2] === 'child') {
 
   var timeElasped = 0;
   var closeServer = function() {
-    console.error('closeServer');
+    console.error('[m] closeServer');
     var startTime = Date.now();
     server.on('close', function() {
-      console.error('emit(close)');
+      console.error('[m] emit(close)');
       timeElasped = Date.now() - startTime;
     });
 
-    console.error('calling server.close');
+    console.error('[m] calling server.close');
     server.close();
 
     setTimeout(function() {
-      console.error('sending close to children');
+      assert(!closeEmitted);
+      console.error('[m] sending close to children');
       child1.send('close');
       child2.send('close');
       child3.disconnect();
diff --git a/test/simple/test-child-process-fork-track.js b/test/simple/test-child-process-fork-track.js
new file mode 100644 (file)
index 0000000..2382f64
--- /dev/null
@@ -0,0 +1,110 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var assert = require('assert');
+var common = require('../common');
+var fork = require('child_process').fork;
+var net = require('net');
+var count = 12;
+
+if (process.argv[2] === 'child') {
+  var sockets = [];
+  var id = process.argv[3];
+
+  process.on('message', function(m, socket) {
+    if (socket) {
+      sockets.push(socket);
+    }
+
+    if (m.cmd === 'close') {
+      sockets[m.id].once('close', function() {
+        process.send({ id: m.id, status: 'closed' });
+      });
+      sockets[m.id].destroy();
+    }
+  });
+} else {
+  var child = fork(process.argv[1], ['child']);
+
+  var server = net.createServer();
+  var sockets = [];
+  var closed = 0;
+
+  server.on('connection', function(socket) {
+    child.send({ cmd: 'new' }, socket, { track: true });
+    sockets.push(socket);
+
+    socket.once('close', function() {
+      console.error('[m] socket closed');
+      closed++;
+      assert.equal(closed + server.connections, count);
+      if (server.connections === 0) server.close();
+    });
+
+    if (sockets.length === count) {
+      closeSockets();
+    }
+  });
+
+  var disconnected = 0;
+  server.on('listening', function() {
+
+    var j = count, client;
+    while (j--) {
+      client = net.connect(common.PORT, '127.0.0.1');
+      client.on('close', function() {
+        console.error('[m] CLIENT: close event');
+        disconnected += 1;
+      });
+      // XXX This resume() should be unnecessary.
+      // a stream high water mark should be enough to keep
+      // consuming the input.
+      client.resume();
+    }
+  });
+
+  function closeSockets(i) {
+    if (!i) i = 0;
+    if (i === count) return;
+
+    child.send({ id: i, cmd: 'close' });
+    child.once('message', function(m) {
+      assert(m.status === 'closed');
+      closeSockets(i + 1);
+    });
+  };
+
+  var closeEmitted = false;
+  server.on('close', function() {
+    console.error('[m] server close');
+    closeEmitted = true;
+
+    child.kill();
+  });
+
+  server.listen(common.PORT, '127.0.0.1');
+
+  process.on('exit', function() {
+    assert.equal(disconnected, count);
+    assert.equal(closed, count);
+    assert.ok(closeEmitted);
+  });
+}