implement getaddrinfo
authorRyan Dahl <ry@tinyclouds.org>
Fri, 18 Dec 2009 13:52:02 +0000 (14:52 +0100)
committerRyan Dahl <ry@tinyclouds.org>
Tue, 29 Dec 2009 20:12:31 +0000 (21:12 +0100)
lib/net.js
src/node_net2.cc
test-net-server.js
wscript

index 16826a5..cf2a8bf 100644 (file)
@@ -7,6 +7,7 @@ function debug (x) {
 }
 
 
+var assert      = process.assert;
 var socket      = process.socket;
 var bind        = process.bind;
 var connect     = process.connect;
@@ -18,6 +19,9 @@ var read        = process.read;
 var write       = process.write;
 var toRead      = process.toRead;
 var socketError = process.socketError;
+var getsockname = process.getsockname;
+var getaddrinfo = process.getaddrinfo;
+var needsLookup = process.needsLookup;
 var EINPROGRESS = process.EINPROGRESS;
 
 
@@ -28,11 +32,8 @@ function Stream (peerInfo) {
 
   // Allocated on demand.
   self.recvBuffer = null;
-  self.sendQueue = [];
 
   self.readWatcher = new process.IOWatcher(function () {
-    debug('\n' + self.fd + ' readable');
-
     // If this is the first recv (recvBuffer doesn't exist) or we've used up
     // most of the recvBuffer, allocate a new one.
     if (!self.recvBuffer || 
@@ -59,14 +60,19 @@ function Stream (peerInfo) {
       self.emit('receive', slice);
     }
   });
+  self.readable = false;
 
-  self._onWriteFlush = function () {
-    self.flush();
+  self.sendQueue = [];    // queue of buffers that need to be written to socket
+                          // XXX use link list? 
+  self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length!
+  self._doFlush = function () {
+    assert(self.sendQueueSize > 0);
+    if (self.flush()) {
+      assert(self.sendQueueSize == 0);
+      self.emit("drain");
+    }
   };
-
-  self.writeWatcher = new process.IOWatcher(self._onWriteFlush);
-
-  self.readable = false;
+  self.writeWatcher = new process.IOWatcher(self._doFlush);
   self.writable = false;
 
   if (peerInfo) {
@@ -76,8 +82,9 @@ function Stream (peerInfo) {
 
     self.readWatcher.set(self.fd, true, false);
     self.readWatcher.start();
-    self.writeWatcher.set(self.fd, false, true);
     self.readable = true;
+
+    self.writeWatcher.set(self.fd, false, true);
     self.writable = true;
   }
 };
@@ -85,6 +92,13 @@ process.inherits(Stream, process.EventEmitter);
 exports.Stream = Stream;
 
 
+exports.createConnection = function (port, host) {
+  var s = new Stream();
+  s.connect(port, host);
+  return s;
+};
+
+
 Stream.prototype._allocateNewRecvBuf = function () {
   var self = this;
 
@@ -122,6 +136,7 @@ Stream.prototype._allocateSendBuffer = function () {
 
 Stream.prototype._sendString = function (data, encoding) {
   var self = this;
+  if (!self.writable) throw new Error('Stream is not writable');
   var buffer;
   if (self.sendQueue.length == 0) {
     buffer = self._allocateSendBuffer();
@@ -144,22 +159,26 @@ Stream.prototype._sendString = function (data, encoding) {
   encoding = encoding || 'ascii'; // default to ascii since it's faster
 
   var charsWritten;
+  var bytesWritten;
 
   if (encoding.toLowerCase() == 'utf8') {
     charsWritten = buffer.utf8Write(data,
                                     buffer.used,
                                     buffer.length - buffer.used);
-    buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten));
+    bytesWritten = process.Buffer.utf8Length(data.slice(0, charsWritten));
   } else {
     // ascii
     charsWritten = buffer.asciiWrite(data,
                                      buffer.used,
                                      buffer.length - buffer.used);
-    buffer.used += charsWritten;
-    debug('ascii charsWritten ' + charsWritten);
-    debug('ascii buffer.used ' + buffer.used);
+    bytesWritten = charsWritten;
   }
+  
+  buffer.used += bytesWritten;
+  self.sendQueueSize += bytesWritten;
 
+  debug('charsWritten ' + charsWritten);
+  debug('buffer.used ' + buffer.used);
 
   // If we didn't finish, then recurse with the rest of the string.
   if (charsWritten < data.length) {
@@ -169,8 +188,12 @@ Stream.prototype._sendString = function (data, encoding) {
 };
 
 
+// Returns true if all the data was flushed to socket. Returns false if
+// something was queued. If data was queued, then the "drain" event will
+// signal when it has been finally flushed to socket.
 Stream.prototype.send = function (data, encoding) {
   var self = this;
+  if (!self.writable) throw new Error('Stream is not writable');
   if (typeof(data) == 'string') {
     self._sendString(data, encoding);
   } else {
@@ -189,15 +212,18 @@ Stream.prototype.send = function (data, encoding) {
     }
 
     if (!inserted) self.sendQueue.push(data);
+
+    self.sendQueueSize += data.used;
   }
-  this.flush();
+  return this.flush();
 };
 
 
-// returns true if flushed without getting EAGAIN
-// false if it got EAGAIN
+// Flushes the write buffer out. Emits "drain" if the buffer is empty.
 Stream.prototype.flush = function () {
   var self = this;
+  if (!self.writable) throw new Error('Stream is not writable');
+
   var bytesWritten;
   while (self.sendQueue.length > 0) {
     var b = self.sendQueue[0];
@@ -213,13 +239,16 @@ Stream.prototype.flush = function () {
                          b.sent,
                          b.used - b.sent);
     if (bytesWritten === null) {
-      this.writeWatcher.start();
+      // could not flush everything
+      self.writeWatcher.start();
+      assert(self.sendQueueSize > 0);
       return false;
     }
     b.sent += bytesWritten;
+    self.sendQueueSize -= bytesWritten;
     debug('bytes sent: ' + b.sent);
   }
-  this.writeWatcher.stop();
+  self.writeWatcher.stop();
   return true;
 };
 
@@ -261,17 +290,15 @@ Stream.prototype.connect = function () {
     var errno = socketError(self.fd);
     if (errno == 0) {
       // connection established
-      self.emit('connect');
       self.readWatcher.start();
       self.readable = true;
       self.writable = true;
-      self.writeWatcher.callback = self._onWriteFlush;
+      self.writeWatcher.callback = self._doFlush;
+      self.emit('connect');
     } else if (errno != EINPROGRESS) {
       var e = new Error('connection error');
       e.errno = errno;
-      self.readWatcher.stop();
-      self.writeWatcher.stop();
-      close(self.fd);
+      self.forceClose(e);
     }
   };
 };
@@ -292,17 +319,35 @@ Stream.prototype.forceClose = function (exception) {
 };
 
 
-Stream.prototype.close = function () {
-  if (this.readable && this.writable) {
+Stream.prototype._shutdown = function () {
+  if (this.writable) {
     this.writable = false;
     shutdown(this.fd, "write");
-  } else if (!this.readable && this.writable) {
+  }
+};
+
+
+Stream.prototype.close = function () {
+  var self = this;
+  var closeMethod;
+  if (self.readable && self.writable) {
+    closeMethod = self._shutdown;
+  } else if (!self.readable && self.writable) {
     // already got EOF
-    this.forceClose(this.fd);
+    closeMethod = self.forceClose;
   }
   // In the case we've already shutdown write side, 
   // but haven't got EOF: ignore. In the case we're
   // fully closed already: ignore.
+
+  if (closeMethod) {
+    if (self.sendQueueSize == 0) {
+      // no queue. just shut down the socket.
+      closeMethod();
+    } else {
+      self.addListener("drain", closeMethod);
+    }
+  }
 };
 
 
@@ -327,30 +372,43 @@ process.inherits(Server, process.EventEmitter);
 exports.Server = Server;
 
 
+exports.createServer = function (listener) {
+  return new Server(listener);
+};
+
+
 Server.prototype.listen = function () {
   var self = this;
   if (self.fd) throw new Error('Server already opened');
 
-  var backlogIndex;
   if (typeof(arguments[0]) == 'string' && arguments.length == 1) {
     // the first argument specifies a path
     self.fd = process.socket('UNIX');
+    self.type = 'UNIX';
     // TODO unlink sockfile if exists?
     // if (lstat(SOCKFILE, &tstat) == 0) {
     //   assert(S_ISSOCK(tstat.st_mode));
     //   unlink(SOCKFILE);
     // }
     bind(self.fd, arguments[0]);
-    backlogIndex = 1;
+  } else if (arguments.length == 0) {
+    self.fd = process.socket('TCP');
+    self.type = 'TCP';
+    // Don't bind(). OS will assign a port with INADDR_ANY. The port will be
+    // passed to the 'listening' event.
   } else {
     // the first argument is the port, the second an IP
     self.fd = process.socket('TCP');
+    self.type = 'TCP';
+    if (needsLookup(arguments[1])) {
+      getaddrinfo(arguments[1], function (ip) {
+      });
+    }
     // TODO dns resolution on arguments[1]
     bind(self.fd, arguments[0], arguments[1]);
-    backlogIndex = typeof(arguments[1]) == 'string' ? 2 : 1;
   }
-  listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128);
 
+  listen(self.fd, 128);
   self.emit("listening");
 
   self.watcher.set(self.fd, true, false); 
@@ -358,10 +416,15 @@ Server.prototype.listen = function () {
 };
 
 
+Server.prototype.sockName = function () {
+  return getsockname(self.fd);
+};
+
+
 Server.prototype.close = function () {
-  var self = this;
-  if (!self.fd) throw new Error('Not running');
-  self.watcher.stop();
-  close(self.fd);
-  self.fd = null;
+  if (!this.fd) throw new Error('Not running');
+  this.watcher.stop();
+  close(this.fd);
+  this.fd = null;
+  this.emit("close");
 };
index af96216..3457cdb 100644 (file)
@@ -5,6 +5,7 @@
 #include <node_buffer.h>
 
 #include <string.h>
+#include <stdlib.h>
 
 #include <sys/types.h>
 #include <sys/socket.h>
@@ -32,6 +33,8 @@ static Persistent<String> syscall_symbol;
 static Persistent<String> fd_symbol;
 static Persistent<String> remote_address_symbol;
 static Persistent<String> remote_port_symbol;
+static Persistent<String> address_symbol;
+static Persistent<String> port_symbol;
 
 #define FD_ARG(a)                                                     \
   if (!(a)->IsInt32()) {                                              \
@@ -181,11 +184,11 @@ static inline Handle<Value> ParseAddressArgs(Handle<Value> first,
       char ipv6[255] = "::FFFF:";
 
       if (inet_pton(AF_INET, *ip, &(in6.sin6_addr)) > 0) {
-        // If this is an IPv4 address then we need to change it 
-        // to the IPv4-mapped-on-IPv6 format which looks like 
+        // If this is an IPv4 address then we need to change it
+        // to the IPv4-mapped-on-IPv6 format which looks like
         //        ::FFFF:<IPv4  address>
-        // For more information see "Address Format" ipv6(7) and 
-        // "BUGS" in inet_pton(3) 
+        // For more information see "Address Format" ipv6(7) and
+        // "BUGS" in inet_pton(3)
         strcat(ipv6, *ip);
       } else {
         strcpy(ipv6, *ip);
@@ -313,6 +316,38 @@ static Handle<Value> Connect(const Arguments& args) {
 }
 
 
+static Handle<Value> GetSockName(const Arguments& args) {
+  HandleScope scope;
+
+  FD_ARG(args[0])
+
+  struct sockaddr_storage address_storage;
+  socklen_t len = sizeof(struct sockaddr_storage);
+
+  int r = getsockname(fd, (struct sockaddr *) &address_storage, &len);
+
+  if (r < 0) {
+    return ThrowException(ErrnoException(errno, "getsockname"));
+  }
+
+  Local<Object> info = Object::New();
+
+  if (address_storage.ss_family == AF_INET6) {
+    struct sockaddr_in6 *a = (struct sockaddr_in6*)&address_storage;
+
+    char ip[INET6_ADDRSTRLEN];
+    inet_ntop(AF_INET6, &(a->sin6_addr), ip, INET6_ADDRSTRLEN);
+
+    int port = ntohs(a->sin6_port);
+
+    info->Set(address_symbol, String::New(ip));
+    info->Set(port_symbol, Integer::New(port));
+  }
+
+  return scope.Close(info);
+}
+
+
 static Handle<Value> Listen(const Arguments& args) {
   HandleScope scope;
 
@@ -323,6 +358,7 @@ static Handle<Value> Listen(const Arguments& args) {
     return ThrowException(ErrnoException(errno, "listen"));
   }
 
+
   return Undefined();
 }
 
@@ -499,6 +535,167 @@ static Handle<Value> ToRead(const Arguments& args) {
 }
 
 
+// G E T A D D R I N F O
+
+struct resolve_request {
+  Persistent<Function> cb;
+  int ai_family; // AF_INET or AF_INET6
+  char hostname[1];
+};
+
+
+static int AfterResolve(eio_req *req) {
+  ev_unref(EV_DEFAULT_UC);
+
+  struct resolve_request * rreq = (struct resolve_request *)(req->data);
+
+  struct addrinfo *address = NULL,
+                  *address_list = static_cast<struct addrinfo *>(req->ptr2);
+
+  HandleScope scope;
+  Local<Value> argv[1];
+
+  if (req->result != 0) {
+    argv[0] = ErrnoException(errno, "getaddrinfo");
+  } else {
+    int n = 0;
+    for (address = address_list; address; address = address->ai_next) { n++; }
+
+    Local<Array> results = Array::New(n);
+
+    char ip[INET6_ADDRSTRLEN];
+
+    n = 0;
+    address = address_list;
+    while (address) {
+      HandleScope scope;
+      assert(address->ai_family == AF_INET || address->ai_family == AF_INET6);
+      assert(address->ai_socktype == SOCK_STREAM);
+      const char *c = inet_ntop(address->ai_family, &(address->ai_addr), ip, INET6_ADDRSTRLEN);
+      Local<String> s = String::New(c);
+      results->Set(Integer::New(n), s);
+
+      n++;
+      address = address->ai_next;
+    }
+
+    argv[0] = results;
+  }
+
+  TryCatch try_catch;
+
+  rreq->cb->Call(Context::GetCurrent()->Global(), 1, argv);
+
+  if (try_catch.HasCaught()) {
+    FatalException(try_catch);
+  }
+
+  rreq->cb.Dispose(); // Dispose of the persistent handle
+  free(rreq);
+  freeaddrinfo(address_list);
+}
+
+static int Resolve(eio_req *req) {
+  // Note: this function is executed in the thread pool! CAREFUL
+  struct resolve_request * rreq = (struct resolve_request *) req->data;
+  struct addrinfo *address_list = NULL;
+
+  struct addrinfo hints;
+  memset(&hints, 0, sizeof(struct addrinfo));
+  hints.ai_family = rreq->ai_family;
+  hints.ai_socktype = SOCK_STREAM;
+
+  req->result = getaddrinfo((char*)rreq->hostname, NULL, &hints, &address_list);
+  req->ptr2 = address_list;
+  return 0;
+}
+
+
+static Handle<Value> GetAddrInfo(const Arguments& args) {
+  HandleScope scope;
+
+  String::Utf8Value hostname(args[0]->ToString());
+
+  int type = args[1]->Int32Value();
+  int fam = AF_INET;
+  switch (type) {
+    case 4:
+      fam = AF_INET;
+      break;
+    case 6:
+      fam = AF_INET6;
+      break;
+    default:
+      return ThrowException(Exception::TypeError(
+            String::New("Second argument must be an integer 4 or 6")));
+  }
+
+  if (!args[2]->IsFunction()) {
+    return ThrowException(Exception::TypeError(
+          String::New("Thrid argument must be a callback")));
+  }
+
+  Local<Function> cb = Local<Function>::Cast(args[2]);
+
+  struct resolve_request *rreq = (struct resolve_request *)
+    malloc(sizeof(struct resolve_request) + hostname.length());
+
+  if (!rreq) {
+    V8::LowMemoryNotification();
+    return ThrowException(Exception::Error(
+          String::New("Could not allocate enough memory")));
+  }
+
+  strcpy(rreq->hostname, *hostname);
+  rreq->cb = Persistent<Function>::New(cb);
+  rreq->ai_family = fam;
+
+  // For the moment I will do DNS lookups in the eio thread pool. This is
+  // sub-optimal and cannot handle massive numbers of requests.
+  //
+  // (One particularly annoying problem is that the pthread stack size needs
+  // to be increased dramatically to handle getaddrinfo() see X_STACKSIZE in
+  // wscript ).
+  //
+  // In the future I will move to a system using c-ares:
+  // http://lists.schmorp.de/pipermail/libev/2009q1/000632.html
+  eio_custom(Resolve, EIO_PRI_DEFAULT, AfterResolve, rreq);
+
+  // There will not be any active watchers from this object on the event
+  // loop while getaddrinfo() runs. If the only thing happening in the
+  // script was this hostname resolution, then the event loop would drop
+  // out. Thus we need to add ev_ref() until AfterResolve().
+  ev_ref(EV_DEFAULT_UC);
+
+  return Undefined();
+}
+
+
+static Handle<Value> NeedsLookup(const Arguments& args) {
+  HandleScope scope;
+
+  if (args[0]->IsNull() || args[0]->IsUndefined()) return False();
+
+  String::Utf8Value s(args[0]->ToString());
+
+  // avoiding buffer overflows in the following strcat
+  // 2001:0db8:85a3:08d3:1319:8a2e:0370:7334
+  // 39 = max ipv6 address.
+  if (s.length() > INET6_ADDRSTRLEN) return True();
+
+  struct sockaddr_in6 a;
+
+  if (inet_pton(AF_INET, *s, &(a.sin6_addr)) > 0) return False();
+  if (inet_pton(AF_INET6, *s, &(a.sin6_addr)) > 0) return False();
+
+  char ipv6[255] = "::FFFF:";
+  strcat(ipv6, *s);
+  if (inet_pton(AF_INET6, ipv6, &(a.sin6_addr)) > 0) return False();
+
+  return True();
+}
+
+
 void InitNet2(Handle<Object> target) {
   HandleScope scope;
 
@@ -517,7 +714,9 @@ void InitNet2(Handle<Object> target) {
   NODE_SET_METHOD(target, "accept", Accept);
   NODE_SET_METHOD(target, "socketError", SocketError);
   NODE_SET_METHOD(target, "toRead", ToRead);
-
+  NODE_SET_METHOD(target, "getsocksame", GetSockName);
+  NODE_SET_METHOD(target, "getaddrinfo", GetAddrInfo);
+  NODE_SET_METHOD(target, "needsLookup", NeedsLookup);
 
   target->Set(String::NewSymbol("EINPROGRESS"), Integer::New(EINPROGRESS));
   target->Set(String::NewSymbol("EINTR"), Integer::New(EINTR));
@@ -525,12 +724,14 @@ void InitNet2(Handle<Object> target) {
   target->Set(String::NewSymbol("EPERM"), Integer::New(EPERM));
   target->Set(String::NewSymbol("EADDRINUSE"), Integer::New(EADDRINUSE));
   target->Set(String::NewSymbol("ECONNREFUSED"), Integer::New(ECONNREFUSED));
+
   errno_symbol          = NODE_PSYMBOL("errno");
   syscall_symbol        = NODE_PSYMBOL("syscall");
   fd_symbol             = NODE_PSYMBOL("fd");
   remote_address_symbol = NODE_PSYMBOL("remoteAddress");
   remote_port_symbol    = NODE_PSYMBOL("remotePort");
+  address_symbol        = NODE_PSYMBOL("address");
+  port_symbol           = NODE_PSYMBOL("port");
 }
 
 }  // namespace node
index aabd7f2..6968eae 100644 (file)
@@ -19,6 +19,10 @@ var server = new net.Server(function (stream) {
     stream.send("pong utf8\r\n", "utf8");
   });
 
+  stream.addListener('drain', function () {
+    sys.puts("server-side socket drain");
+  });
+
   stream.addListener("eof", function () {
     sys.puts("server peer eof");
     stream.close();
@@ -28,15 +32,17 @@ server.listen(8000);
 sys.puts("server fd: " + server.fd);
 
 
-var stream = new net.Stream();
-stream.addListener('connect', function () {
+var c = net.createConnection(8000);
+c.addListener('connect', function () {
   sys.puts("!!!client connected");
-  stream.send("hello\n");
+  c.send("hello\n");
 });
 
-stream.addListener('receive', function (d) {
-  sys.puts("!!!client got: " + JSON.stringify(d.toString()));
+c.addListener('drain', function () {
+  sys.puts("!!!client drain");
 });
 
-stream.connect(8000);
+c.addListener('receive', function (d) {
+  sys.puts("!!!client got: " + JSON.stringify(d.toString()));
+});
 
diff --git a/wscript b/wscript
index d2e13c9..43a7a53 100644 (file)
--- a/wscript
+++ b/wscript
@@ -139,7 +139,7 @@ def configure(conf):
 
   conf.define("HAVE_CONFIG_H", 1)
 
-  conf.env.append_value("CCFLAGS", "-DX_STACKSIZE=%d" % (1024*64))
+  conf.env.append_value("CCFLAGS", "-DX_STACKSIZE=%d" % (2*1024*1024))
 
   # LFS
   conf.env.append_value('CCFLAGS',  '-D_LARGEFILE_SOURCE')