From cbd4033619cc45abdf878285c412bac9c3f36e4e Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Sat, 20 Aug 2011 03:47:40 +0200 Subject: [PATCH] dgram: integrate libuv UDP support --- doc/api/dgram.markdown | 12 +- lib/{dgram.js => dgram_legacy.js} | 0 lib/dgram_uv.js | 312 +++++++++++++++++++++++++++++++++ src/node.js | 3 + src/node_extensions.h | 1 + src/udp_wrap.cc | 360 ++++++++++++++++++++++++++++++++++++++ wscript | 1 + 7 files changed, 687 insertions(+), 2 deletions(-) rename lib/{dgram.js => dgram_legacy.js} (100%) create mode 100644 lib/dgram_uv.js create mode 100644 src/udp_wrap.cc diff --git a/doc/api/dgram.markdown b/doc/api/dgram.markdown index cc2aea2..2d41594 100644 --- a/doc/api/dgram.markdown +++ b/doc/api/dgram.markdown @@ -31,6 +31,11 @@ Creates a datagram socket of the specified types. Valid types are: Takes an optional callback which is added as a listener for `message` events. +Call `socket.bind` if you want to receive datagrams. `socket.bind()` will bind +to the "all interfaces" address on a random port (it does the right thing for +both `udp4` and `udp6` sockets). You can then retrieve the address and port +with `socket.address().address` and `socket.address().port`. + ### dgram.send(buf, offset, length, path, [callback]) For Unix domain datagram sockets, the destination address is a pathname in the filesystem. @@ -61,6 +66,10 @@ re-used. Note that DNS lookups will delay the time that a send takes place, at least until the next tick. The only way to know for sure that a send has taken place is to use the callback. +If the socket has not been previously bound with a call to `bind`, it's +assigned a random port number and bound to the "all interfaces" address +(0.0.0.0 for IPv4-only systems, ::0 for IPv6 and dual stack systems). + Example of sending a UDP packet to a random port on `localhost`; var dgram = require('dgram'); @@ -142,8 +151,7 @@ Example of a UDP server listening on port 41234: ### dgram.close() -Close the underlying socket and stop listening for data on it. UDP sockets -automatically listen for messages, even if they did not call `bind()`. +Close the underlying socket and stop listening for data on it. ### dgram.address() diff --git a/lib/dgram.js b/lib/dgram_legacy.js similarity index 100% rename from lib/dgram.js rename to lib/dgram_legacy.js diff --git a/lib/dgram_uv.js b/lib/dgram_uv.js new file mode 100644 index 0000000..c06b291 --- /dev/null +++ b/lib/dgram_uv.js @@ -0,0 +1,312 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var util = require('util'); +var events = require('events'); + +var UDP = process.binding('udp_wrap').UDP; + +// lazily loaded +var dns = null; +var net = null; + + +// no-op callback +function noop() { +} + + +function isIP(address) { + if (!net) + net = require('net'); + + return net.isIP(address); +} + + +function lookup(address, family, callback) { + // implicit 'bind before send' needs to run on the same tick + var matchedFamily = isIP(address); + if (matchedFamily) + return callback(null, address, matchedFamily); + + if (!dns) + dns = require('dns'); + + return dns.lookup(address, family, callback); +} + + +function lookup4(address, callback) { + return lookup(address || '0.0.0.0', 4, callback); +} + + +function lookup6(address, callback) { + return lookup(address || '::0', 6, callback); +} + + +function newHandle(type) { + if (type == 'udp4') { + var handle = new UDP; + handle.lookup = lookup4; + return handle; + } + + if (type == 'udp6') { + var handle = new UDP; + handle.lookup = lookup6; + handle.bind = handle.bind6; + handle.send = handle.send6; + return handle; + } + + if (type == 'unix_dgram') + throw new Error('unix_dgram sockets are not supported any more.'); + + throw new Error('Bad socket type specified. Valid types are: udp4, udp6'); +} + + +function Socket(type, listener) { + events.EventEmitter.call(this); + + var handle = newHandle(type); + handle.socket = this; + + this._handle = handle; + this._receiving = false; + this._bound = false; + this.type = type; + + if (typeof listener === 'function') + this.on('message', listener); +} +util.inherits(Socket, events.EventEmitter); +exports.Socket = Socket; + + +exports.createSocket = function(type, listener) { + return new Socket(type, listener); +}; + + +Socket.prototype.bind = function(port, address) { + var self = this; + + self._healthCheck(); + + // resolve address first + self._handle.lookup(address, function(err, ip) { + if (!err) { + if (self._handle.bind(ip, port || 0, /*flags=*/0)) { + err = errnoException(errno, 'bind'); + } + else { + self._bound = true; + self.emit('listening'); + self._startReceiving(); + } + } + + if (err) { + // caller may not have had a chance yet to register its + // error event listener so defer the error to the next tick + process.nextTick(function() { + self.emit('error', err); + }); + } + }); +}; + + +// thin wrapper around `send`, here for compatibility with dgram_legacy.js +Socket.prototype.sendto = function(buffer, + offset, + length, + port, + address, + callback) { + if (typeof offset !== 'number' || typeof length !== 'number') + throw new Error('send takes offset and length as args 2 and 3'); + + if (typeof address !== 'string') + throw new Error(this.type + ' sockets must send to port, address'); + + this.send(buffer, offset, length, port, address, callback); +}; + + +Socket.prototype.send = function(buffer, + offset, + length, + port, + address, + callback) { + var self = this; + + callback = callback || noop; + + self._healthCheck(); + self._startReceiving(); + + self._handle.lookup(address, function(err, ip) { + if (err) { + if (callback) callback(err); + self.emit('error', err); + } + else { + var req = self._handle.send(buffer, offset, length, port, ip); + if (req) { + req.oncomplete = afterSend; + req.cb = callback; + } + else { + // don't emit as error, dgram_legacy.js compatibility + callback(errnoException(errno, 'send')); + } + } + }); +}; + + +function afterSend(status, handle, req, buffer) { + var self = handle.socket; + + // CHECKME socket's been closed by user, don't call callback? + if (handle !== self._handle) + void(0); + + if (req.cb) + req.cb(null, buffer.length); // compatibility with dgram_legacy.js +} + + +Socket.prototype.close = function() { + this._healthCheck(); + this._stopReceiving(); + this._handle.close(); + this._handle = null; + this.emit('close'); +}; + + +Socket.prototype.address = function() { + this._healthCheck(); + + var address = this._handle.getsockname(); + if (!address) + throw errnoException(errno, 'getsockname'); + + return address; +}; + + +Socket.prototype.setBroadcast = function(arg) { + throw new Error('not yet implemented'); +}; + + +Socket.prototype.setTTL = function(arg) { + throw new Error('not yet implemented'); +}; + + +Socket.prototype.setMulticastTTL = function(arg) { + throw new Error('not yet implemented'); +}; + + +Socket.prototype.setMulticastLoopback = function(arg) { + throw new Error('not yet implemented'); +}; + + +Socket.prototype.addMembership = function(multicastAddress, + multicastInterface) { + // are we ever going to support this in libuv? + throw new Error('not yet implemented'); +}; + + +Socket.prototype.dropMembership = function(multicastAddress, + multicastInterface) { + // are we ever going to support this in libuv? + throw new Error('not yet implemented'); +}; + + +Socket.prototype._healthCheck = function() { + if (!this._handle) + throw new Error('Not running'); // error message from dgram_legacy.js +}; + + +Socket.prototype._startReceiving = function() { + if (this._receiving) + return; + + if (!this._bound) { + this.bind(); // bind to random port + + // sanity check + if (!this._bound) + throw new Error('implicit bind failed'); + } + + this._handle.onmessage = onMessage; + this._handle.recvStart(); + this._receiving = true; + this.fd = -42; // compatibility hack +}; + + +Socket.prototype._stopReceiving = function() { + if (!this._receiving) + return; + + this._handle.onmessage = null; + this._handle.recvStop(); + this._receiving = false; +}; + + +function onMessage(handle, nread, buf, rinfo) { + var self = handle.socket; + + if (nread == -1) { + self.emit('error', errnoException('recvmsg')); + } + else { + rinfo.size = buf.length; // compatibility + self.emit('message', buf, rinfo); + } +} + + +// TODO share with net_uv and others +function errnoException(errorno, syscall) { + var e = new Error(syscall + ' ' + errorno); + e.errno = e.code = errorno; + e.syscall = syscall; + return e; +} diff --git a/src/node.js b/src/node.js index 4af01ec..aa0592e 100644 --- a/src/node.js +++ b/src/node.js @@ -425,6 +425,9 @@ case 'timers': return process.features.uv ? 'timers_uv' : 'timers_legacy'; + case 'dgram': + return process.features.uv ? 'dgram_uv' : 'dgram_legacy'; + case 'dns': return process.features.uv ? 'dns_uv' : 'dns_legacy'; diff --git a/src/node_extensions.h b/src/node_extensions.h index d33a523..9357ea3 100644 --- a/src/node_extensions.h +++ b/src/node_extensions.h @@ -44,6 +44,7 @@ NODE_EXT_LIST_ITEM(node_os) // libuv rewrite NODE_EXT_LIST_ITEM(node_timer_wrap) NODE_EXT_LIST_ITEM(node_tcp_wrap) +NODE_EXT_LIST_ITEM(node_udp_wrap) NODE_EXT_LIST_ITEM(node_pipe_wrap) NODE_EXT_LIST_ITEM(node_cares_wrap) NODE_EXT_LIST_ITEM(node_stdio_wrap) diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc new file mode 100644 index 0000000..e9d170a --- /dev/null +++ b/src/udp_wrap.cc @@ -0,0 +1,360 @@ +#include +#include + +#include +#include + +// Temporary hack: libuv should provide uv_inet_pton and uv_inet_ntop. +// Clean this up in tcp_wrap.cc too. +#if defined(__MINGW32__) || defined(_MSC_VER) + extern "C" { +# include +# include + } +# define uv_inet_pton ares_inet_pton +# define uv_inet_ntop ares_inet_ntop + +#else // __POSIX__ +# include +# define uv_inet_pton inet_pton +# define uv_inet_ntop inet_ntop +#endif + +using namespace v8; + +namespace node { + +#define UNWRAP \ + assert(!args.Holder().IsEmpty()); \ + assert(args.Holder()->InternalFieldCount() > 0); \ + UDPWrap* wrap = \ + static_cast(args.Holder()->GetPointerFromInternalField(0)); \ + if (!wrap) { \ + SetErrno(UV_EBADF); \ + return scope.Close(Integer::New(-1)); \ + } + +// TODO share with tcp_wrap.cc +Persistent address_symbol; +Persistent port_symbol; +Persistent buffer_sym; + +void AddressToJS(Handle info, + const sockaddr* addr, + int addrlen); + +typedef ReqWrap SendWrap; + +class UDPWrap: public HandleWrap { +public: + static void Initialize(Handle target); + static Handle New(const Arguments& args); + static Handle Bind(const Arguments& args); + static Handle Send(const Arguments& args); + static Handle Bind6(const Arguments& args); + static Handle Send6(const Arguments& args); + static Handle RecvStart(const Arguments& args); + static Handle RecvStop(const Arguments& args); + static Handle GetSockName(const Arguments& args); + +private: + UDPWrap(Handle object); + virtual ~UDPWrap(); + + static uv_buf_t OnAlloc(uv_handle_t* handle, size_t suggested_size); + static void OnSend(uv_udp_send_t* req, int status); + static void OnRecv(uv_udp_t* handle, + ssize_t nread, + uv_buf_t buf, + struct sockaddr* addr, + unsigned flags); + + uv_udp_t handle_; +}; + + +UDPWrap::UDPWrap(Handle object): HandleWrap(object, + (uv_handle_t*)&handle_) { + int r = uv_udp_init(&handle_); + assert(r == 0); // can't fail anyway + handle_.data = reinterpret_cast(this); +} + + +UDPWrap::~UDPWrap() { +} + + +void UDPWrap::Initialize(Handle target) { + HandleWrap::Initialize(target); + + HandleScope scope; + + buffer_sym = NODE_PSYMBOL("buffer"); + port_symbol = NODE_PSYMBOL("port"); + address_symbol = NODE_PSYMBOL("address"); + + Local t = FunctionTemplate::New(New); + t->InstanceTemplate()->SetInternalFieldCount(1); + t->SetClassName(String::NewSymbol("UDP")); + + NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind); + NODE_SET_PROTOTYPE_METHOD(t, "send", Send); + NODE_SET_PROTOTYPE_METHOD(t, "bind6", Bind6); + NODE_SET_PROTOTYPE_METHOD(t, "send6", Send6); + NODE_SET_PROTOTYPE_METHOD(t, "close", Close); + NODE_SET_PROTOTYPE_METHOD(t, "recvStart", RecvStart); + NODE_SET_PROTOTYPE_METHOD(t, "recvStop", RecvStop); + NODE_SET_PROTOTYPE_METHOD(t, "getsockname", GetSockName); + + target->Set(String::NewSymbol("UDP"), + Persistent::New(t)->GetFunction()); +} + + +Handle UDPWrap::New(const Arguments& args) { + HandleScope scope; + + assert(args.IsConstructCall()); + new UDPWrap(args.This()); + + return scope.Close(args.This()); +} + + +Handle UDPWrap::Bind(const Arguments& args) { + HandleScope scope; + + UNWRAP + + // bind(ip, port, flags) + assert(args.Length() == 3); + + String::Utf8Value address(args[0]->ToString()); + const int port = args[1]->Uint32Value(); + const int flags = args[2]->Uint32Value(); + const sockaddr_in addr = uv_ip4_addr(*address, port); + + int r = uv_udp_bind(&wrap->handle_, addr, flags); + if (r) + SetErrno(uv_last_error().code); + + return scope.Close(Integer::New(r)); +} + + +Handle UDPWrap::Bind6(const Arguments& args) { + assert(0 && "implement me"); + return Null(); +} + + +Handle UDPWrap::Send(const Arguments& args) { + HandleScope scope; + + // send(buffer, offset, length, port, address) + assert(args.Length() == 5); + + UNWRAP + + assert(Buffer::HasInstance(args[0])); + Local buffer_obj = args[0]->ToObject(); + + size_t offset = args[1]->Uint32Value(); + size_t length = args[2]->Uint32Value(); + + SendWrap* req_wrap = new SendWrap(); + req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj); + + uv_buf_t buf = uv_buf_init(Buffer::Data(buffer_obj) + offset, + length); + + const unsigned short port = args[3]->Uint32Value(); + String::Utf8Value address(args[4]->ToString()); + const sockaddr_in addr = uv_ip4_addr(*address, port); + + int r = uv_udp_send(&req_wrap->req_, &wrap->handle_, &buf, 1, addr, OnSend); + req_wrap->Dispatched(); + + if (r) { + SetErrno(uv_last_error().code); + delete req_wrap; + return Null(); + } + else { + return scope.Close(req_wrap->object_); + } +} + + +Handle UDPWrap::Send6(const Arguments& args) { + assert(0 && "implement me"); + return Null(); +} + + +Handle UDPWrap::RecvStart(const Arguments& args) { + HandleScope scope; + + UNWRAP + + // UV_EALREADY means that the socket is already bound but that's okay + int r = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv); + if (r && uv_last_error().code != UV_EALREADY) { + SetErrno(uv_last_error().code); + return False(); + } + + return True(); +} + + +Handle UDPWrap::RecvStop(const Arguments& args) { + HandleScope scope; + + UNWRAP + + int r = uv_udp_recv_stop(&wrap->handle_); + + return scope.Close(Integer::New(r)); +} + + +Handle UDPWrap::GetSockName(const Arguments& args) { + HandleScope scope; + struct sockaddr_storage address; + + UNWRAP + + int addrlen = sizeof(address); + int r = uv_getsockname(reinterpret_cast(&wrap->handle_), + reinterpret_cast(&address), + &addrlen); + + if (r == 0) { + Local sockname = Object::New(); + AddressToJS(sockname, reinterpret_cast(&address), addrlen); + return scope.Close(sockname); + } + else { + SetErrno(uv_last_error().code); + return Null(); + } +} + + +// TODO share with StreamWrap::AfterWrite() in stream_wrap.cc +void UDPWrap::OnSend(uv_udp_send_t* req, int status) { + HandleScope scope; + + assert(req != NULL); + + SendWrap* req_wrap = reinterpret_cast(req->data); + UDPWrap* wrap = reinterpret_cast(req->handle->data); + + assert(req_wrap->object_.IsEmpty() == false); + assert(wrap->object_.IsEmpty() == false); + + if (status) { + SetErrno(uv_last_error().code); + } + + Local argv[4] = { + Integer::New(status), + Local::New(wrap->object_), + Local::New(req_wrap->object_), + req_wrap->object_->GetHiddenValue(buffer_sym), + }; + + MakeCallback(req_wrap->object_, "oncomplete", 4, argv); + delete req_wrap; +} + + +uv_buf_t UDPWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) { + // FIXME switch to slab allocation, share with stream_wrap.cc + return uv_buf_init(new char[suggested_size], suggested_size); +} + + +static void ReleaseMemory(char* data, void* arg) { + delete[] data; // data == buf.base +} + + +void UDPWrap::OnRecv(uv_udp_t* handle, + ssize_t nread, + uv_buf_t buf, + struct sockaddr* addr, + unsigned flags) { + if (nread == 0) { + ReleaseMemory(buf.base, NULL); + return; + } + + HandleScope scope; + + UDPWrap* wrap = reinterpret_cast(handle->data); + + Handle argv[4] = { + wrap->object_, + Integer::New(nread), + Null(), + Null() + }; + + if (nread == -1) { + SetErrno(uv_last_error().code); + } + else { + Local rinfo = Object::New(); + AddressToJS(rinfo, addr, sizeof *addr); + argv[2] = Buffer::New(buf.base, nread, ReleaseMemory, NULL)->handle_; + argv[3] = rinfo; + } + + MakeCallback(wrap->object_, "onmessage", ARRAY_SIZE(argv), argv); +} + + +void AddressToJS(Handle info, + const sockaddr* addr, + int addrlen) { + char ip[INET6_ADDRSTRLEN]; + const sockaddr_in *a4; + const sockaddr_in6 *a6; + int port; + + assert(addr != NULL); + + if (addrlen == 0) { + info->Set(address_symbol, String::Empty()); + return; + } + + switch (addr->sa_family) { + case AF_INET6: + a6 = reinterpret_cast(addr); + inet_ntop(AF_INET6, &a6->sin6_addr, ip, sizeof ip); + port = ntohs(a6->sin6_port); + info->Set(address_symbol, String::New(ip)); + info->Set(port_symbol, Integer::New(port)); + break; + + case AF_INET: + a4 = reinterpret_cast(addr); + inet_ntop(AF_INET, &a4->sin_addr, ip, sizeof ip); + port = ntohs(a4->sin_port); + info->Set(address_symbol, String::New(ip)); + info->Set(port_symbol, Integer::New(port)); + break; + + default: + info->Set(address_symbol, String::Empty()); + } +} + + +} // namespace node + +NODE_MODULE(node_udp_wrap, node::UDPWrap::Initialize); diff --git a/wscript b/wscript index 05cdd22..44a2071 100644 --- a/wscript +++ b/wscript @@ -865,6 +865,7 @@ def build(bld): src/handle_wrap.cc src/stream_wrap.cc src/tcp_wrap.cc + src/udp_wrap.cc src/pipe_wrap.cc src/cares_wrap.cc src/stdio_wrap.cc -- 2.7.4