child_process: support sending dgram socket
authorAndreas Madsen <amwebdk@gmail.com>
Wed, 27 Feb 2013 18:31:24 +0000 (19:31 +0100)
committerBen Noordhuis <info@bnoordhuis.nl>
Thu, 7 Mar 2013 16:51:17 +0000 (17:51 +0100)
child.send can send net servers and sockets. Now that we have support
for dgram clusters this functionality should be extended to include
dgram sockets.

doc/api/child_process.markdown
lib/child_process.js
lib/dgram.js
test/simple/test-child-process-fork-dgram.js [new file with mode: 0644]

index 76d42d5..b652e88 100644 (file)
@@ -200,6 +200,10 @@ And the child would the receive the server object as:
 Note that the server is now shared between the parent and child, this means
 that some connections will be handled by the parent and some by the child.
 
+For `dgram` servers the workflow is exactly the same.  Here you listen on
+a `message` event instead of `connection` and use `server.bind` instead of
+`server.listen`.
+
 #### Example: sending socket object
 
 Here is an example of sending a socket. It will spawn two children and handle
index a4a5d2b..80735d4 100644 (file)
@@ -21,6 +21,7 @@
 
 var EventEmitter = require('events').EventEmitter;
 var net = require('net');
+var dgram = require('dgram');
 var Process = process.binding('process_wrap').Process;
 var util = require('util');
 var constants; // if (!constants) constants = process.binding('constants');
@@ -167,6 +168,24 @@ var handleConversion = {
     got: function(message, handle, emit) {
       emit(handle);
     }
+  },
+
+  'dgram.Socket': {
+    simultaneousAccepts: false,
+
+    send: function(message, socket) {
+      message.dgramType = socket.type;
+
+      return socket._handle;
+    },
+
+    got: function(message, handle, emit) {
+      var socket = new dgram.Socket(message.dgramType);
+
+      socket.bind(handle, function() {
+        emit(socket);
+      });
+    }
   }
 };
 
@@ -377,6 +396,8 @@ function setupChannel(target, channel) {
       } else if (handle instanceof process.binding('tcp_wrap').TCP ||
                  handle instanceof process.binding('pipe_wrap').Pipe) {
         message.type = 'net.Native';
+      } else if (handle instanceof dgram.Socket) {
+        message.type = 'dgram.Socket';
       } else if (handle instanceof process.binding('udp_wrap').UDP) {
         message.type = 'dgram.Native';
       } else {
index 91c2243..e13f066 100644 (file)
@@ -141,8 +141,20 @@ function startListening(socket) {
   socket.emit('listening');
 }
 
+function replaceHandle(self, newHandle) {
 
-Socket.prototype.bind = function(port, address, callback) {
+  // Set up the handle that we got from master.
+  newHandle.lookup = self._handle.lookup;
+  newHandle.bind = self._handle.bind;
+  newHandle.send = self._handle.send;
+  newHandle.owner = self;
+
+  // Replace the existing handle by the handle we got from master.
+  self._handle.close();
+  self._handle = newHandle;
+}
+
+Socket.prototype.bind = function(/*port, address, callback*/) {
   var self = this;
 
   self._healthCheck();
@@ -152,8 +164,18 @@ Socket.prototype.bind = function(port, address, callback) {
 
   this._bindState = BIND_STATE_BINDING;
 
-  if (typeof callback === 'function')
-    self.once('listening', callback);
+  if (typeof arguments[arguments.length - 1] === 'function')
+    self.once('listening', arguments[arguments.length - 1]);
+
+  var UDP = process.binding('udp_wrap').UDP;
+  if (arguments[0] instanceof UDP) {
+    replaceHandle(self, arguments[0]);
+    startListening(self);
+    return;
+  }
+
+  var port = arguments[0];
+  var address = arguments[1];
 
   // resolve address first
   self._handle.lookup(address, function(err, ip) {
@@ -172,16 +194,7 @@ Socket.prototype.bind = function(port, address, callback) {
           // handle has been closed in the mean time.
           return handle.close();
 
-        // Set up the handle that we got from master.
-        handle.lookup = self._handle.lookup;
-        handle.bind = self._handle.bind;
-        handle.send = self._handle.send;
-        handle.owner = self;
-
-        // Replace the existing handle by the handle we got from master.
-        self._handle.close();
-        self._handle = handle;
-
+        replaceHandle(self, handle);
         startListening(self);
       });
 
diff --git a/test/simple/test-child-process-fork-dgram.js b/test/simple/test-child-process-fork-dgram.js
new file mode 100644 (file)
index 0000000..039310f
--- /dev/null
@@ -0,0 +1,103 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var dgram = require('dgram');
+var fork = require('child_process').fork;
+var assert = require('assert');
+var common = require('../common');
+
+if (process.argv[2] === 'child') {
+  var childCollected = 0;
+  var server;
+
+  process.on('message', function removeMe(msg, clusterServer) {
+    if (msg === 'server') {
+      server = clusterServer;
+
+      server.on('message', function () {
+        childCollected += 1;
+      });
+
+    } else if (msg === 'stop') {
+      server.close();
+      process.send(childCollected);
+      process.removeListener('message', removeMe);
+    }
+  });
+
+} else {
+  var server = dgram.createSocket('udp4');
+  var client = dgram.createSocket('udp4');
+  var child = fork(__filename, ['child']);
+  var msg = new Buffer('Some bytes');
+
+  var parentCollected = 0;
+  var childCollected = 0;
+  server.on('message', function (msg, rinfo) {
+    parentCollected += 1;
+  });
+
+  server.on('listening', function () {
+    child.send('server', server);
+
+    sendMessages();
+  });
+
+  var sendMessages = function () {
+    var wait = 0;
+    var send = 0;
+    var total = 100;
+
+    var timer = setInterval(function () {
+      send += 1;
+      if (send === total) {
+        clearInterval(timer);
+      }
+
+      client.send(msg, 0, msg.length, common.PORT, '127.0.0.1', function(err) {
+          if (err) throw err;
+
+          wait += 1;
+          if (wait === total) {
+            shutdown();
+          }
+        }
+      );
+    }, 1);
+  };
+
+  var shutdown = function () {
+    child.send('stop');
+    child.once('message', function (collected) {
+      childCollected = collected;
+    });
+
+    server.close();
+    client.close();
+  };
+
+  server.bind(common.PORT, '127.0.0.1');
+
+  process.once('exit', function () {
+    assert(childCollected > 0);
+    assert(parentCollected > 0);
+  });
+}