cluster: support datagram sockets
authorBert Belder <bertbelder@gmail.com>
Tue, 22 Jan 2013 22:52:20 +0000 (23:52 +0100)
committerBert Belder <bertbelder@gmail.com>
Mon, 28 Jan 2013 21:12:21 +0000 (22:12 +0100)
lib/child_process.js
lib/cluster.js
lib/dgram.js
src/handle_wrap.h
src/stream_wrap.cc
src/udp_wrap.cc
src/udp_wrap.h
test/simple/test-cluster-dgram-1.js [new file with mode: 0644]
test/simple/test-cluster-dgram-2.js [new file with mode: 0644]

index 1ac2b91..ef8c800 100644 (file)
@@ -155,6 +155,18 @@ var handleConversion = {
 
       emit(socket);
     }
+  },
+
+  'dgram.Native': {
+    simultaneousAccepts: false,
+
+    send: function(message, handle) {
+      return handle;
+    },
+
+    got: function(message, handle, emit) {
+      emit(handle);
+    }
   }
 };
 
@@ -355,18 +367,20 @@ function setupChannel(target, channel) {
       // this message will be handled by an internalMessage event handler
       message = {
         cmd: 'NODE_HANDLE',
-        type: 'net.',
         msg: message
       };
 
-      switch (handle.constructor.name) {
-        case 'Socket':
-          message.type += 'Socket'; break;
-        case 'Server':
-          message.type += 'Server'; break;
-        case 'Pipe':
-        case 'TCP':
-          message.type += 'Native'; break;
+      if (handle instanceof net.Socket) {
+        message.type = 'net.Socket';
+      } else if (handle instanceof net.Server) {
+        message.type = 'net.Server';
+      } else if (handle instanceof process.binding('tcp_wrap').TCP ||
+                 handle instanceof process.binding('pipe_wrap').Pipe) {
+        message.type = 'net.Native';
+      } else if (handle instanceof process.binding('udp_wrap').UDP) {
+        message.type = 'dgram.Native';
+      } else {
+        throw new TypeError("This handle type can't be sent");
       }
 
       var obj = handleConversion[message.type];
index b2e7625..6c0f8f4 100644 (file)
@@ -227,8 +227,14 @@ if (cluster.isMaster) {
 
     if (serverHandlers.hasOwnProperty(key)) {
       handler = serverHandlers[key];
+    } else if (message.addressType === 'udp4' ||
+               message.addressType === 'udp6') {
+      var dgram = require('dgram');
+      handler = dgram._createSocketHandle.apply(net, args);
+      serverHandlers[key] = handler;
     } else {
-      handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
+      handler = net._createServerHandle.apply(net, args);
+      serverHandlers[key] = handler;
     }
 
     // echo callback with the fd handler associated with it
@@ -259,9 +265,9 @@ if (cluster.isMaster) {
   messageHandler.suicide = function(message, worker) {
     worker.suicide = true;
   };
-
 }
 
+
 // Messages to a worker will be handled using these methods
 else if (cluster.isWorker) {
 
@@ -541,7 +547,8 @@ cluster._setupWorker = function() {
   sendInternalMessage(worker, { cmd: 'online' });
 };
 
-// Internal function. Called by lib/net.js when attempting to bind a server.
+// Internal function. Called by net.js and dgram.js when attempting to bind a
+// TCP server or UDP socket.
 cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
   // This can only be called from a worker.
   assert(cluster.isWorker);
index 65e080f..242c197 100644 (file)
@@ -19,6 +19,7 @@
 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 // USE OR OTHER DEALINGS IN THE SOFTWARE.
 
+var assert = require('assert');
 var util = require('util');
 var events = require('events');
 
@@ -29,6 +30,7 @@ var BIND_STATE_BINDING = 1;
 var BIND_STATE_BOUND = 2;
 
 // lazily loaded
+var cluster = null;
 var dns = null;
 var net = null;
 
@@ -86,6 +88,24 @@ function newHandle(type) {
 }
 
 
+exports._createSocketHandle = function(address, port, addressType, fd) {
+  // Opening an existing fd is not supported for UDP handles.
+  assert(typeof fd !== 'number' || fd < 0);
+
+  var handle = newHandle(addressType);
+
+  if (port || address) {
+    var r = handle.bind(address, port || 0, 0);
+    if (r == -1) {
+      handle.close();
+      handle = null;
+    }
+  }
+
+  return handle;
+};
+
+
 function Socket(type, listener) {
   events.EventEmitter.call(this);
 
@@ -110,41 +130,75 @@ exports.createSocket = function(type, listener) {
 };
 
 
+function startListening(socket) {
+  socket._handle.onmessage = onMessage;
+  // Todo: handle errors
+  socket._handle.recvStart();
+  socket._receiving = true;
+  socket._bindState = BIND_STATE_BOUND;
+  socket.fd = -42; // compatibility hack
+
+  socket.emit('listening');
+}
+
+
 Socket.prototype.bind = function(port, address, callback) {
   var self = this;
 
   self._healthCheck();
 
+  if (this._bindState != BIND_STATE_UNBOUND)
+    throw new Error('Socket is already bound');
+
+  this._bindState = BIND_STATE_BINDING;
+
   if (typeof callback === 'function')
     self.once('listening', callback);
 
   // resolve address first
   self._handle.lookup(address, function(err, ip) {
-    self._bindState = BIND_STATE_UNBOUND;
-
-    if (!self._handle)
-      return; // handle has been closed in the mean time
-
     if (err) {
+      self._bindState = BIND_STATE_UNBOUND;
       self.emit('error', err);
       return;
     }
 
-    if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
-      self.emit('error', errnoException(errno, 'bind'));
-      return;
-    }
-
-    self._handle.onmessage = onMessage;
-    self._handle.recvStart();
-    self._receiving = true;
-    self._bindState = BIND_STATE_BOUND;
-    self.fd = -42; // compatibility hack
+    if (!cluster)
+      cluster = require('cluster');
+
+    if (cluster.isWorker) {
+      cluster._getServer(self, ip, port, self.type, -1, function(handle) {
+        if (!self._handle)
+          // handle has been closed in the mean time.
+          return handle.close();
+
+        // Set up the handle that we got from master.
+        handle.lookup = self._handle.lookup;
+        handle.bind = self._handle.bind;
+        handle.send = self._handle.send;
+        handle.owner = self;
+
+        // Replace the existing handle by the handle we got from master.
+        self._handle.close();
+        self._handle = handle;
+
+        startListening(self);
+      });
+
+    } else {
+      if (!self._handle)
+        return; // handle has been closed in the mean time
+
+      if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
+        self.emit('error', errnoException(errno, 'bind'));
+        self._bindState = BIND_STATE_UNBOUND;
+        // Todo: close?
+        return;
+      }
 
-    self.emit('listening');
+      startListening(self);
+    }
   });
-
-  self._bindState = BIND_STATE_BINDING;
 };
 
 
index a358e81..951b938 100644 (file)
@@ -53,6 +53,8 @@ class HandleWrap {
     static v8::Handle<v8::Value> Ref(const v8::Arguments& args);
     static v8::Handle<v8::Value> Unref(const v8::Arguments& args);
 
+    inline uv_handle_t* GetHandle() { return handle__; };
+
   protected:
     HandleWrap(v8::Handle<v8::Object> object, uv_handle_t* handle);
     virtual ~HandleWrap();
index 910e94b..e6756e1 100644 (file)
@@ -27,6 +27,7 @@
 #include "pipe_wrap.h"
 #include "tcp_wrap.h"
 #include "req_wrap.h"
+#include "udp_wrap.h"
 #include "node_counters.h"
 
 #include <stdlib.h> // abort()
@@ -118,7 +119,7 @@ StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream)
 
 void StreamWrap::SetHandle(uv_handle_t* h) {
   HandleWrap::SetHandle(h);
-  stream_ = (uv_stream_t*)h;
+  stream_ = reinterpret_cast<uv_stream_t*>(h);
   stream_->data = this;
 }
 
@@ -173,6 +174,28 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
 }
 
 
+template <class WrapType, class UVType>
+static Local<Object> AcceptHandle(uv_stream_t* pipe) {
+  HandleScope scope;
+  Local<Object> wrap_obj;
+  WrapType* wrap;
+  UVType* handle;
+
+  wrap_obj = WrapType::Instantiate();
+  if (wrap_obj.IsEmpty())
+    return Local<Object>();
+
+  wrap = static_cast<WrapType*>(
+      wrap_obj->GetAlignedPointerFromInternalField(0));
+  handle = wrap->UVHandle();
+
+  if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
+    abort();
+
+  return scope.Close(wrap_obj);
+}
+
+
 void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
     uv_buf_t buf, uv_handle_type pending) {
   HandleScope scope;
@@ -212,19 +235,16 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
 
   Local<Object> pending_obj;
   if (pending == UV_TCP) {
-    pending_obj = TCPWrap::Instantiate();
+    pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle);
   } else if (pending == UV_NAMED_PIPE) {
-    pending_obj = PipeWrap::Instantiate();
+    pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle);
+  } else if (pending == UV_UDP) {
+    pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle);
   } else {
-    // We only support sending UV_TCP and UV_NAMED_PIPE right now.
     assert(pending == UV_UNKNOWN_HANDLE);
   }
 
   if (!pending_obj.IsEmpty()) {
-    assert(pending_obj->InternalFieldCount() > 0);
-    StreamWrap* pending_wrap = static_cast<StreamWrap*>(
-        pending_obj->GetAlignedPointerFromInternalField(0));
-    if (uv_accept(handle, pending_wrap->GetStream())) abort();
     argv[3] = pending_obj;
     argc++;
   }
@@ -246,7 +266,7 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
 
 void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
     uv_handle_type pending) {
-  OnReadCommon((uv_stream_t*)handle, nread, buf, pending);
+  OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending);
 }
 
 
@@ -404,14 +424,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
                  StreamWrap::AfterWrite);
 
   } else {
-    uv_stream_t* send_stream = NULL;
+    uv_handle_t* send_handle = NULL;
 
     if (args[1]->IsObject()) {
-      Local<Object> send_stream_obj = args[1]->ToObject();
-      assert(send_stream_obj->InternalFieldCount() > 0);
-      StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
-          send_stream_obj->GetAlignedPointerFromInternalField(0));
-      send_stream = send_stream_wrap->GetStream();
+      Local<Object> send_handle_obj = args[1]->ToObject();
+      assert(send_handle_obj->InternalFieldCount() > 0);
+      HandleWrap* send_handle_wrap = static_cast<HandleWrap*>(
+          send_handle_obj->GetAlignedPointerFromInternalField(0));
+      send_handle = send_handle_wrap->GetHandle();
 
       // Reference StreamWrap instance to prevent it from being garbage
       // collected before `AfterWrite` is called.
@@ -419,14 +439,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
         handle_sym = NODE_PSYMBOL("handle");
       }
       assert(!req_wrap->object_.IsEmpty());
-      req_wrap->object_->Set(handle_sym, send_stream_obj);
+      req_wrap->object_->Set(handle_sym, send_handle_obj);
     }
 
     r = uv_write2(&req_wrap->req_,
                   wrap->stream_,
                   &buf,
                   1,
-                  send_stream,
+                  reinterpret_cast<uv_stream_t*>(send_handle),
                   StreamWrap::AfterWrite);
   }
 
index d476149..6d01ebd 100644 (file)
@@ -40,6 +40,7 @@ typedef ReqWrap<uv_udp_send_t> SendWrap;
 // see tcp_wrap.cc
 Local<Object> AddressToJS(const sockaddr* addr);
 
+static Persistent<Function> constructor;
 static Persistent<String> buffer_sym;
 static Persistent<String> oncomplete_sym;
 static Persistent<String> onmessage_sym;
@@ -98,8 +99,9 @@ void UDPWrap::Initialize(Handle<Object> target) {
   NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref);
   NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref);
 
-  target->Set(String::NewSymbol("UDP"),
-              Persistent<FunctionTemplate>::New(t)->GetFunction());
+  constructor = Persistent<Function>::New(
+      Persistent<FunctionTemplate>::New(t)->GetFunction());
+  target->Set(String::NewSymbol("UDP"), constructor);
 }
 
 
@@ -392,6 +394,17 @@ UDPWrap* UDPWrap::Unwrap(Local<Object> obj) {
 }
 
 
+Local<Object> UDPWrap::Instantiate() {
+  // If this assert fires then Initialize hasn't been called yet.
+  assert(constructor.IsEmpty() == false);
+
+  HandleScope scope;
+  Local<Object> obj = constructor->NewInstance();
+
+  return scope.Close(obj);
+}
+
+
 uv_udp_t* UDPWrap::UVHandle() {
   return &handle_;
 }
index 9ca2eae..527346f 100644 (file)
@@ -33,6 +33,7 @@ class UDPWrap: public HandleWrap {
   static Handle<Value> SetTTL(const Arguments& args);
   static UDPWrap* Unwrap(Local<Object> obj);
 
+  static Local<Object> Instantiate();
   uv_udp_t* UVHandle();
 
  private:
diff --git a/test/simple/test-cluster-dgram-1.js b/test/simple/test-cluster-dgram-1.js
new file mode 100644 (file)
index 0000000..c6dc095
--- /dev/null
@@ -0,0 +1,115 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var NUM_WORKERS = 4;
+var PACKETS_PER_WORKER = 10;
+
+var assert = require('assert');
+var cluster = require('cluster');
+var common = require('../common');
+var dgram = require('dgram');
+
+
+if (process.platform === 'win32') {
+  console.warn("dgram clustering is currently not supported on windows.");
+  process.exit(0);
+}
+
+if (cluster.isMaster)
+  master();
+else
+  worker();
+
+
+function master() {
+  var listening = 0;
+
+  // Fork 4 workers.
+  for (var i = 0; i < NUM_WORKERS; i++)
+    cluster.fork();
+
+  // Wait until all workers are listening.
+  cluster.on('listening', function() {
+    if (++listening < NUM_WORKERS)
+      return;
+
+    // Start sending messages.
+    var buf = new Buffer('hello world');
+    var socket = dgram.createSocket('udp4');
+    var sent = 0;
+    doSend();
+
+    function doSend() {
+      socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1', afterSend);
+    }
+
+    function afterSend() {
+      sent++;
+      if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
+        doSend();
+      } else {
+        console.log('master sent %d packets', sent);
+        socket.close();
+      }
+    }
+  });
+
+  // Set up event handlers for every worker. Each worker sends a message when
+  // it has received the expected number of packets. After that it disconnects.
+  for (var key in cluster.workers) {
+    if (cluster.workers.hasOwnProperty(key))
+      setupWorker(cluster.workers[key]);
+  }
+
+  function setupWorker(worker) {
+    var received = 0;
+
+    worker.on('message', function(msg) {
+      received = msg.received;
+      console.log('worker %d received %d packets', worker.id, received);
+    });
+
+    worker.on('disconnect', function() {
+      assert(received === PACKETS_PER_WORKER);
+      console.log('worker %d disconnected', worker.id);
+    });
+  }
+}
+
+
+function worker() {
+  var received = 0;
+
+  // Create udp socket and start listening.
+  var socket = dgram.createSocket('udp4');
+
+  socket.on('message', function(data, info) {
+    received++;
+
+    // Every 10 messages, notify the master.
+    if (received == PACKETS_PER_WORKER) {
+      process.send({received: received});
+      process.disconnect();
+    }
+  });
+
+  socket.bind(common.PORT);
+}
diff --git a/test/simple/test-cluster-dgram-2.js b/test/simple/test-cluster-dgram-2.js
new file mode 100644 (file)
index 0000000..d06ad79
--- /dev/null
@@ -0,0 +1,81 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following condonitions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var NUM_WORKERS = 4;
+var PACKETS_PER_WORKER = 10;
+
+var assert = require('assert');
+var cluster = require('cluster');
+var common = require('../common');
+var dgram = require('dgram');
+
+
+if (process.platform === 'win32') {
+  console.warn("dgram clustering is currently not supported on windows.");
+  process.exit(0);
+}
+
+if (cluster.isMaster)
+  master();
+else
+  worker();
+
+
+function master() {
+  var i;
+  var received = 0;
+
+  // Start listening on a socket.
+  var socket = dgram.createSocket('udp4');
+  socket.bind(common.PORT);
+
+  // Disconnect workers when the expected number of messages have been
+  // received.
+  socket.on('message', function(data, info) {
+    received++;
+
+    if (received == PACKETS_PER_WORKER * NUM_WORKERS) {
+      console.log('master received %d packets', received);
+
+      // Close the socket.
+      socket.close();
+
+      // Disconnect all workers.
+      cluster.disconnect();
+    }
+  });
+
+  // Fork workers.
+  for (var i = 0; i < NUM_WORKERS; i++)
+    cluster.fork();
+}
+
+
+function worker() {
+  // Create udp socket and send packets to master.
+  var socket = dgram.createSocket('udp4');
+  var buf = new Buffer('hello world');
+
+  for (var i = 0; i < PACKETS_PER_WORKER; i++)
+    socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1');
+
+  console.log('worker %d sent %d packets', cluster.worker.id, PACKETS_PER_WORKER);
+}