net,dgram: workers can listen on exclusive ports
authorcjihrig <cjihrig@gmail.com>
Fri, 22 Aug 2014 20:51:53 +0000 (16:51 -0400)
committerTrevor Norris <trev.norris@gmail.com>
Wed, 3 Sep 2014 22:16:16 +0000 (15:16 -0700)
Allow cluster workers to listen on exclusive ports for TCP and UDP,
instead of forcing all calls to go through the cluster master.

Fixes: #3856
Reviewed-by: Trevor Norris <trev.norris@gmail.com>
Reviewed-by: Fedor Indutny <fedor@indutny.com>
doc/api/dgram.markdown
doc/api/net.markdown
lib/dgram.js
lib/net.js
test/simple/test-dgram-bind-shared-ports.js [new file with mode: 0644]
test/simple/test-net-listen-shared-ports.js [new file with mode: 0644]

index b659e6b..891c0c7 100644 (file)
@@ -188,6 +188,32 @@ Example of a UDP server listening on port 41234:
     // server listening 0.0.0.0:41234
 
 
+### socket.bind(options, [callback])
+
+* `options` {Object} - Required. Supports the following properties:
+  * `port` {Number} - Required.
+  * `address` {String} - Optional.
+  * `exclusive` {Boolean} - Optional.
+* `callback` {Function} - Optional.
+
+The `port` and `address` properties of `options`, as well as the optional
+callback function, behave as they do on a call to
+[socket.bind(port, \[address\], \[callback\])
+](#dgram_socket_bind_port_address_callback).
+
+If `exclusive` is `false` (default), then cluster workers will use the same
+underlying handle, allowing connection handling duties to be shared. When
+`exclusive` is `true`, the handle is not shared, and attempted port sharing
+results in an error. An example which listens on an exclusive port is
+shown below.
+
+    socket.bind({
+      address: 'localhost',
+      port: 8000,
+      exclusive: true
+    });
+
+
 ### socket.close()
 
 Close the underlying socket and stop listening for data on it.
index 08ffe02..eb4988a 100644 (file)
@@ -208,6 +208,34 @@ This function is asynchronous.  When the server has been bound,
 the last parameter `callback` will be added as an listener for the
 ['listening'][] event.
 
+### server.listen(options, [callback])
+
+* `options` {Object} - Required. Supports the following properties:
+  * `port` {Number} - Optional.
+  * `host` {String} - Optional.
+  * `backlog` {Number} - Optional.
+  * `path` {String} - Optional.
+  * `exclusive` {Boolean} - Optional.
+* `callback` {Function} - Optional.
+
+The `port`, `host`, and `backlog` properties of `options`, as well as the
+optional callback function, behave as they do on a call to
+[server.listen(port, \[host\], \[backlog\], \[callback\])
+](#net_server_listen_port_host_backlog_callback). Alternatively, the `path`
+option can be used to specify a UNIX socket.
+
+If `exclusive` is `false` (default), then cluster workers will use the same
+underlying handle, allowing connection handling duties to be shared. When
+`exclusive` is `true`, the handle is not shared, and attempted port sharing
+results in an error. An example which listens on an exclusive port is
+shown below.
+
+    server.listen({
+      host: 'localhost',
+      port: 80,
+      exclusive: true
+    });
+
 ### server.close([callback])
 
 Stops the server from accepting new connections and keeps existing
index 574d9f4..aae2f51 100644 (file)
@@ -150,7 +150,7 @@ function replaceHandle(self, newHandle) {
   self._handle = newHandle;
 }
 
-Socket.prototype.bind = function(/*port, address, callback*/) {
+Socket.prototype.bind = function(port /*, address, callback*/) {
   var self = this;
 
   self._healthCheck();
@@ -164,15 +164,23 @@ Socket.prototype.bind = function(/*port, address, callback*/) {
     self.once('listening', arguments[arguments.length - 1]);
 
   var UDP = process.binding('udp_wrap').UDP;
-  if (arguments[0] instanceof UDP) {
-    replaceHandle(self, arguments[0]);
+  if (port instanceof UDP) {
+    replaceHandle(self, port);
     startListening(self);
     return;
   }
 
-  var port = arguments[0];
-  var address = arguments[1];
-  if (util.isFunction(address)) address = '';  // a.k.a. "any address"
+  var address;
+  var exclusive;
+
+  if (util.isObject(port)) {
+    address = port.address || '';
+    exclusive = !!port.exclusive;
+    port = port.port;
+  } else {
+    address = util.isFunction(arguments[1]) ? '' : arguments[1];
+    exclusive = false;
+  }
 
   // resolve address first
   self._handle.lookup(address, function(err, ip) {
@@ -185,7 +193,7 @@ Socket.prototype.bind = function(/*port, address, callback*/) {
     if (!cluster)
       cluster = require('cluster');
 
-    if (cluster.isWorker) {
+    if (cluster.isWorker && !exclusive) {
       cluster._getServer(self, ip, port, self.type, -1, function(err, handle) {
         if (err) {
           self.emit('error', errnoException(err, 'bind'));
index ddace95..478c04a 100644 (file)
@@ -1144,10 +1144,12 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
 };
 
 
-function listen(self, address, port, addressType, backlog, fd) {
+function listen(self, address, port, addressType, backlog, fd, exclusive) {
+  exclusive = !!exclusive;
+
   if (!cluster) cluster = require('cluster');
 
-  if (cluster.isMaster) {
+  if (cluster.isMaster || exclusive) {
     self._listen2(address, port, addressType, backlog, fd);
     return;
   }
@@ -1195,24 +1197,34 @@ Server.prototype.listen = function() {
 
   var TCP = process.binding('tcp_wrap').TCP;
 
-  if (arguments.length == 0 || util.isFunction(arguments[0])) {
+  if (arguments.length === 0 || util.isFunction(arguments[0])) {
     // Bind to a random port.
     listen(self, null, 0, null, backlog);
-
-  } else if (arguments[0] && util.isObject(arguments[0])) {
+  } else if (util.isObject(arguments[0])) {
     var h = arguments[0];
-    if (h._handle) {
-      h = h._handle;
-    } else if (h.handle) {
-      h = h.handle;
-    }
+    h = h._handle || h.handle || h;
+
     if (h instanceof TCP) {
       self._handle = h;
       listen(self, null, -1, -1, backlog);
     } else if (util.isNumber(h.fd) && h.fd >= 0) {
       listen(self, null, null, null, backlog, h.fd);
     } else {
-      throw new Error('Invalid listen argument: ' + h);
+      // The first argument is a configuration object
+      if (h.backlog)
+        backlog = h.backlog;
+
+      if (util.isNumber(h.port)) {
+        if (h.host)
+          listenAfterLookup(h.port, h.host, backlog, h.exclusive);
+        else
+          listen(self, null, h.port, 4, backlog, undefined, h.exclusive);
+      } else if (h.path && isPipeName(h.path)) {
+        var pipeName = self._pipeName = h.path;
+        listen(self, pipeName, -1, -1, backlog, undefined, h.exclusive);
+      } else {
+        throw new Error('Invalid listen argument: ' + h);
+      }
     }
   } else if (isPipeName(arguments[0])) {
     // UNIX socket or Windows pipe.
@@ -1227,14 +1239,20 @@ Server.prototype.listen = function() {
 
   } else {
     // The first argument is the port, the second an IP.
-    require('dns').lookup(arguments[1], function(err, ip, addressType) {
+    listenAfterLookup(port, arguments[1], backlog);
+  }
+
+  function listenAfterLookup(port, address, backlog, exclusive) {
+    require('dns').lookup(address, function(err, ip, addressType) {
       if (err) {
         self.emit('error', err);
       } else {
-        listen(self, ip, port, ip ? addressType : 4, backlog);
+        addressType = ip ? addressType : 4;
+        listen(self, ip, port, addressType, backlog, undefined, exclusive);
       }
     });
   }
+
   return self;
 };
 
diff --git a/test/simple/test-dgram-bind-shared-ports.js b/test/simple/test-dgram-bind-shared-ports.js
new file mode 100644 (file)
index 0000000..709d3ad
--- /dev/null
@@ -0,0 +1,66 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var cluster = require('cluster');
+var dgram = require('dgram');
+
+function noop() {}
+
+if (cluster.isMaster) {
+  var worker1 = cluster.fork();
+
+  worker1.on('message', function(msg) {
+    assert.equal(msg, 'success');
+    var worker2 = cluster.fork();
+
+    worker2.on('message', function(msg) {
+      assert.equal(msg, 'socket2:EADDRINUSE');
+      worker1.kill();
+      worker2.kill();
+    });
+  });
+} else {
+  var socket1 = dgram.createSocket('udp4', noop);
+  var socket2 = dgram.createSocket('udp4', noop);
+
+  socket1.on('error', function(err) {
+    // no errors expected
+    process.send('socket1:' + err.code);
+  });
+
+  socket2.on('error', function(err) {
+    // an error is expected on the second worker
+    process.send('socket2:' + err.code);
+  });
+
+  socket1.bind({
+    address: 'localhost',
+    port: common.PORT,
+    exclusive: false
+  }, function() {
+    socket2.bind({port: common.PORT + 1, exclusive: true}, function() {
+      // the first worker should succeed
+      process.send('success');
+    });
+  });
+}
diff --git a/test/simple/test-net-listen-shared-ports.js b/test/simple/test-net-listen-shared-ports.js
new file mode 100644 (file)
index 0000000..7422e49
--- /dev/null
@@ -0,0 +1,66 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var cluster = require('cluster');
+var net = require('net');
+
+function noop() {}
+
+if (cluster.isMaster) {
+  var worker1 = cluster.fork();
+
+  worker1.on('message', function(msg) {
+    assert.equal(msg, 'success');
+    var worker2 = cluster.fork();
+
+    worker2.on('message', function(msg) {
+      assert.equal(msg, 'server2:EADDRINUSE');
+      worker1.kill();
+      worker2.kill();
+    });
+  });
+} else {
+  var server1 = net.createServer(noop);
+  var server2 = net.createServer(noop);
+
+  server1.on('error', function(err) {
+    // no errors expected
+    process.send('server1:' + err.code);
+  });
+
+  server2.on('error', function(err) {
+    // an error is expected on the second worker
+    process.send('server2:' + err.code);
+  });
+
+  server1.listen({
+    host: 'localhost',
+    port: common.PORT,
+    exclusive: false
+  }, function() {
+    server2.listen({port: common.PORT + 1, exclusive: true}, function() {
+      // the first worker should succeed
+      process.send('success');
+    });
+  });
+}