IOWatcher callback isn't internal, fix bug in Accept
authorRyan Dahl <ry@tinyclouds.org>
Tue, 15 Dec 2009 16:17:45 +0000 (17:17 +0100)
committerRyan Dahl <ry@tinyclouds.org>
Tue, 29 Dec 2009 20:12:30 +0000 (21:12 +0100)
src/node_io_watcher.cc
src/node_io_watcher.h
src/node_net2.cc
tcp.js

index 7017d3e..3c60a7b 100644 (file)
@@ -1,6 +1,9 @@
 // Copyright 2009 Ryan Dahl <ry@tinyclouds.org>
 #include <node_io_watcher.h>
 
+#include <node.h>
+#include <v8.h>
+
 #include <assert.h>
 
 namespace node {
@@ -8,19 +11,23 @@ namespace node {
 using namespace v8;
 
 Persistent<FunctionTemplate> IOWatcher::constructor_template;
+Persistent<String> callback_symbol;
 
 void IOWatcher::Initialize(Handle<Object> target) {
   HandleScope scope;
 
   Local<FunctionTemplate> t = FunctionTemplate::New(IOWatcher::New);
   constructor_template = Persistent<FunctionTemplate>::New(t);
-  constructor_template->InstanceTemplate()->SetInternalFieldCount(2);
+  constructor_template->InstanceTemplate()->SetInternalFieldCount(1);
   constructor_template->SetClassName(String::NewSymbol("IOWatcher"));
 
   NODE_SET_PROTOTYPE_METHOD(constructor_template, "start", IOWatcher::Start);
   NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop);
+  NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set);
 
   target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction());
+
+  callback_symbol = NODE_PSYMBOL("callback");
 }
 
 
@@ -29,7 +36,7 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
   assert(w == &io->watcher_);
   HandleScope scope;
 
-  Local<Value> callback_v = io->handle_->GetInternalField(1);
+  Local<Value> callback_v = io->handle_->Get(callback_symbol);
   assert(callback_v->IsFunction());
   Local<Function> callback = Local<Function>::Cast(callback_v);
 
@@ -48,13 +55,49 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
 
 
 // 
-//  var io = new process.IOWatcher(fd, true, true, function (readable, writable) {
+//  var io = new process.IOWatcher(function (readable, writable) {
 //    
 //  });
+//  io.set(fd, true, false);
+//  io.start();
 //
 Handle<Value> IOWatcher::New(const Arguments& args) {
   HandleScope scope;
 
+  if (!args[0]->IsFunction()) {
+    return ThrowException(Exception::TypeError(
+          String::New("First arg should a callback.")));
+  }
+
+  Local<Function> callback = Local<Function>::Cast(args[0]);
+
+  IOWatcher *s = new IOWatcher();
+
+  s->Wrap(args.This());
+
+  s->handle_->Set(callback_symbol, callback);
+
+  return args.This();
+}
+
+
+Handle<Value> IOWatcher::Start(const Arguments& args) {
+  HandleScope scope;
+
+  IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
+
+  ev_io_start(EV_DEFAULT_UC_ &io->watcher_);
+
+  io->Ref();
+
+  return Undefined();
+}
+
+Handle<Value> IOWatcher::Set(const Arguments& args) {
+  HandleScope scope;
+
+  IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
+
   if (!args[0]->IsInt32()) {
     return ThrowException(Exception::TypeError(
           String::New("First arg should be a file descriptor.")));
@@ -78,35 +121,11 @@ Handle<Value> IOWatcher::New(const Arguments& args) {
 
   if (args[2]->IsTrue()) events |= EV_WRITE;
 
-  if (!args[3]->IsFunction()) {
-    return ThrowException(Exception::TypeError(
-          String::New("Fourth arg should a callback.")));
-  }
-
-  Local<Function> callback = Local<Function>::Cast(args[3]);
-
-  IOWatcher *s = new IOWatcher(fd, events);
-
-  s->Wrap(args.This());
-  s->handle_->SetInternalField(1, callback);
-
-  return args.This();
-}
-
-
-Handle<Value> IOWatcher::Start(const Arguments& args) {
-  HandleScope scope;
-
-  IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
-
-  ev_io_start(EV_DEFAULT_UC_ &io->watcher_);
-
-  io->Ref();
+  ev_io_set(&io->watcher_, fd, events);
 
   return Undefined();
 }
 
-
 Handle<Value> IOWatcher::Stop(const Arguments& args) {
   HandleScope scope;
   IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
@@ -117,6 +136,7 @@ Handle<Value> IOWatcher::Stop(const Arguments& args) {
 
 void IOWatcher::Stop () {
   if (watcher_.active) {
+    HandleScope scope;
     ev_io_stop(EV_DEFAULT_UC_ &watcher_);
     Unref();
   }
index 4e40593..420c6de 100644 (file)
@@ -2,7 +2,7 @@
 #ifndef NODE_IO_H_
 #define NODE_IO_H_
 
-#include <node.h>
+#include <node_object_wrap.h>
 #include <ev.h>
 
 namespace node {
@@ -14,8 +14,8 @@ class IOWatcher : ObjectWrap {
  protected:
   static v8::Persistent<v8::FunctionTemplate> constructor_template;
 
-  IOWatcher(int fd, int events) : ObjectWrap() {
-    ev_io_init(&watcher_, IOWatcher::Callback, fd, events);
+  IOWatcher() : ObjectWrap() {
+    ev_init(&watcher_, IOWatcher::Callback);
     watcher_.data = this;
   }
 
@@ -26,6 +26,7 @@ class IOWatcher : ObjectWrap {
   static v8::Handle<v8::Value> New(const v8::Arguments& args);
   static v8::Handle<v8::Value> Start(const v8::Arguments& args);
   static v8::Handle<v8::Value> Stop(const v8::Arguments& args);
+  static v8::Handle<v8::Value> Set(const v8::Arguments& args);
 
  private:
   static void Callback(EV_P_ ev_io *watcher, int revents);
index 13a5aaf..cf2433e 100644 (file)
@@ -334,31 +334,31 @@ static Handle<Value> Accept(const Arguments& args) {
 
   FD_ARG(args[0])
 
-  struct sockaddr_storage addr;
-  socklen_t len;
+  struct sockaddr_storage address_storage;
+  socklen_t len = sizeof(struct sockaddr_storage);
 
-  int peer = accept(fd, (struct sockaddr*) &addr, &len);
+  int peer_fd = accept(fd, (struct sockaddr*) &address_storage, &len);
 
-  if (peer < 0) {
-    if (errno == EAGAIN) return Null(); 
+  if (peer_fd < 0) {
+    if (errno == EAGAIN) return scope.Close(Null());
     return ThrowException(ErrnoException(errno, "accept"));
   }
 
-  if (!SetNonBlock(peer)) {
+  if (!SetNonBlock(peer_fd)) {
     int fcntl_errno = errno;
-    close(peer);
-    return ThrowException(ErrnoException(fcntl_errno, "fcntl"));
+    close(peer_fd);
+    return ThrowException(ErrnoException(fcntl_errno, "fcntl", "Cannot make peer non-blocking"));
   }
 
   Local<Object> peer_info = Object::New();
 
-  peer_info->Set(fd_symbol, Integer::New(fd));
+  peer_info->Set(fd_symbol, Integer::New(peer_fd));
 
-  if (addr.ss_family == AF_INET6) {
-    struct sockaddr_in6 *a = reinterpret_cast<struct sockaddr_in6*>(&addr);
+  if (address_storage.ss_family == AF_INET6) {
+    struct sockaddr_in6 *a = (struct sockaddr_in6*)&address_storage;
 
     char ip[INET6_ADDRSTRLEN];
-    inet_ntop(AF_INET6, &a->sin6_addr, ip, INET6_ADDRSTRLEN);
+    inet_ntop(AF_INET6, &(a->sin6_addr), ip, INET6_ADDRSTRLEN);
 
     int port = ntohs(a->sin6_port);
 
@@ -499,11 +499,11 @@ void InitNet2(Handle<Object> target) {
   target->Set(String::NewSymbol("EADDRINUSE"), Integer::New(EADDRINUSE));
   target->Set(String::NewSymbol("ECONNREFUSED"), Integer::New(ECONNREFUSED));
  
-  errno_symbol = NODE_PSYMBOL("errno");
-  syscall_symbol = NODE_PSYMBOL("syscall");
-  fd_symbol = NODE_PSYMBOL("fd");
+  errno_symbol          = NODE_PSYMBOL("errno");
+  syscall_symbol        = NODE_PSYMBOL("syscall");
+  fd_symbol             = NODE_PSYMBOL("fd");
   remote_address_symbol = NODE_PSYMBOL("remoteAddress");
-  remote_port_symbol = NODE_PSYMBOL("remotePort");
+  remote_port_symbol    = NODE_PSYMBOL("remotePort");
 }
 
 }  // namespace node
diff --git a/tcp.js b/tcp.js
index af73349..eafa144 100644 (file)
--- a/tcp.js
+++ b/tcp.js
@@ -1,8 +1,55 @@
-var socket = process.socket;
-var bind = process.bind;
-var listen = process.listen;
-var accept = process.accept;
-var close = process.close;
+var debugLevel = 0;
+if ("NODE_DEBUG" in process.ENV) debugLevel = 1;
+function debug (x) {
+  if (debugLevel > 0) {
+    process.stdio.writeError(x + "\n");
+  }
+}
+
+var socket    = process.socket;
+var bind      = process.bind;
+var listen    = process.listen;
+var accept    = process.accept;
+var close     = process.close;
+var shutdown  = process.shutdown;
+
+var Peer = function (peerInfo) {
+  process.EventEmitter.call();
+
+  var self = this;
+
+  self.fd = peerInfo.fd;
+  self.remoteAddress = peerInfo.remoteAddress;
+  self.remotePort = peerInfo.remotePort;
+
+  self.readWatcher = new process.IOWatcher(function () {
+    debug(self.fd + " readable");
+  });
+  self.readWatcher.set(self.fd, true, false);
+  self.readWatcher.start();
+
+  self.writeWatcher = new process.IOWatcher(function () {
+    debug(self.fd + " writable");
+  });
+  self.writeWatcher.set(self.fd, false, true);
+
+  self.readable = true;
+  self.writable = true;
+};
+process.inherits(Peer, process.EventEmitter);
+
+Peer.prototype.close = function () {
+  this.readable = false;
+  this.writable = false;
+
+  this.writeWatcher.stop();
+  this.readWatcher.stop();
+  close(this.fd);
+  debug("close peer " + this.fd);
+  this.fd = null;
+};
+
+
 
 var Server = function (listener) {
   var self = this;
@@ -10,7 +57,19 @@ var Server = function (listener) {
   if (listener) {
     self.addListener("connection", listener);
   }
-  
+
+  self.watcher = new process.IOWatcher(function (readable, writeable) {
+    debug("readable " + readable);
+    debug("writable " + writeable);
+    while (self.fd) {
+      debug("accept from " + self.fd);
+      var peerInfo = accept(self.fd);
+      debug("accept: " + JSON.stringify(peerInfo));
+      if (!peerInfo) return;
+      var peer = new Peer(peerInfo);
+      self.emit("connection", peer);
+    }
+  });
 };
 process.inherits(Server, process.EventEmitter);
 
@@ -24,15 +83,7 @@ Server.prototype.listen = function (port, host) {
   bind(self.fd, port, host);
   listen(self.fd, 128); // TODO configurable backlog
 
-  self.watcher = new process.IOWatcher(self.fd, true, false, function () {
-    var peerInfo;
-    while (self.fd) {
-      peerInfo = accept(self.fd);
-      if (peerInfo === null) return;
-      self.emit("connection", peerInfo);      
-    }
-  });
-
+  self.watcher.set(self.fd, true, false); 
   self.watcher.start();
 };
 
@@ -41,15 +92,20 @@ Server.prototype.close = function () {
   if (!self.fd) throw new Error("Not running");
   self.watcher.stop();
   close(self.fd);
-  this.watcher = null;
-  this.fd = null;
+  self.fd = null;
 };
 
 ///////////////////////////////////////////////////////
 
 var sys = require("sys");
-var server = new Server(function () {
-  sys.puts("connection");
-  server.close();
+var server = new Server(function (peer) {
+  sys.puts("connection (" + peer.fd + "): " 
+          + peer.remoteAddress 
+          + " port " 
+          + peer.remotePort
+          );
+  sys.puts("server fd: " + server.fd);
+  peer.close();
 });
 server.listen(8000);
+sys.puts("server fd: " + server.fd);