Change IOWatcher constructor to have no arguments
authorRyan Dahl <ry@tinyclouds.org>
Mon, 28 Dec 2009 15:18:03 +0000 (16:18 +0100)
committerRyan Dahl <ry@tinyclouds.org>
Tue, 29 Dec 2009 20:12:32 +0000 (21:12 +0100)
lib/net.js
src/node_io_watcher.cc
src/node_io_watcher.h

index cf2a8bf..3108fee 100644 (file)
@@ -7,6 +7,7 @@ function debug (x) {
 }
 
 
+var IOWatcher   = process.IOWatcher;
 var assert      = process.assert;
 var socket      = process.socket;
 var bind        = process.bind;
@@ -25,7 +26,7 @@ var needsLookup = process.needsLookup;
 var EINPROGRESS = process.EINPROGRESS;
 
 
-function Stream (peerInfo) {
+function Socket (peerInfo) {
   process.EventEmitter.call();
 
   var self = this;
@@ -33,10 +34,11 @@ function Stream (peerInfo) {
   // Allocated on demand.
   self.recvBuffer = null;
 
-  self.readWatcher = new process.IOWatcher(function () {
+  self.readWatcher = new IOWatcher()
+  self.readWatcher.callback = function () {
     // If this is the first recv (recvBuffer doesn't exist) or we've used up
     // most of the recvBuffer, allocate a new one.
-    if (!self.recvBuffer || 
+    if (!self.recvBuffer ||
         self.recvBuffer.length - self.recvBuffer.used < 128) {
       self._allocateNewRecvBuf();
     }
@@ -52,18 +54,18 @@ function Stream (peerInfo) {
       self.readable = false;
       self.readWatcher.stop();
       self.emit('eof');
-      if (!self.writable) self.forceClose();  
+      if (!self.writable) self.forceClose();
     } else {
       var slice = self.recvBuffer.slice(self.recvBuffer.used,
                                         self.recvBuffer.used + bytesRead);
       self.recvBuffer.used += bytesRead;
       self.emit('receive', slice);
     }
-  });
+  };
   self.readable = false;
 
   self.sendQueue = [];    // queue of buffers that need to be written to socket
-                          // XXX use link list? 
+                          // XXX use link list?
   self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length!
   self._doFlush = function () {
     assert(self.sendQueueSize > 0);
@@ -72,7 +74,8 @@ function Stream (peerInfo) {
       self.emit("drain");
     }
   };
-  self.writeWatcher = new process.IOWatcher(self._doFlush);
+  self.writeWatcher = new IOWatcher();
+  self.writeWatcher.callback = self._doFlush;
   self.writable = false;
 
   if (peerInfo) {
@@ -88,18 +91,18 @@ function Stream (peerInfo) {
     self.writable = true;
   }
 };
-process.inherits(Stream, process.EventEmitter);
-exports.Stream = Stream;
+process.inherits(Socket, process.EventEmitter);
+exports.Socket = Socket;
 
 
 exports.createConnection = function (port, host) {
-  var s = new Stream();
+  var s = new Socket();
   s.connect(port, host);
   return s;
 };
 
 
-Stream.prototype._allocateNewRecvBuf = function () {
+Socket.prototype._allocateNewRecvBuf = function () {
   var self = this;
 
   var newBufferSize = 1024; // TODO make this adjustable from user API
@@ -125,7 +128,7 @@ Stream.prototype._allocateNewRecvBuf = function () {
 };
 
 
-Stream.prototype._allocateSendBuffer = function () {
+Socket.prototype._allocateSendBuffer = function () {
   var b = new process.Buffer(1024);
   b.used = 0;
   b.sent = 0;
@@ -134,9 +137,9 @@ Stream.prototype._allocateSendBuffer = function () {
 };
 
 
-Stream.prototype._sendString = function (data, encoding) {
+Socket.prototype._sendString = function (data, encoding) {
   var self = this;
-  if (!self.writable) throw new Error('Stream is not writable');
+  if (!self.writable) throw new Error('Socket is not writable');
   var buffer;
   if (self.sendQueue.length == 0) {
     buffer = self._allocateSendBuffer();
@@ -191,9 +194,9 @@ Stream.prototype._sendString = function (data, encoding) {
 // Returns true if all the data was flushed to socket. Returns false if
 // something was queued. If data was queued, then the "drain" event will
 // signal when it has been finally flushed to socket.
-Stream.prototype.send = function (data, encoding) {
+Socket.prototype.send = function (data, encoding) {
   var self = this;
-  if (!self.writable) throw new Error('Stream is not writable');
+  if (!self.writable) throw new Error('Socket is not writable');
   if (typeof(data) == 'string') {
     self._sendString(data, encoding);
   } else {
@@ -220,9 +223,9 @@ Stream.prototype.send = function (data, encoding) {
 
 
 // Flushes the write buffer out. Emits "drain" if the buffer is empty.
-Stream.prototype.flush = function () {
+Socket.prototype.flush = function () {
   var self = this;
-  if (!self.writable) throw new Error('Stream is not writable');
+  if (!self.writable) throw new Error('Socket is not writable');
 
   var bytesWritten;
   while (self.sendQueue.length > 0) {
@@ -253,13 +256,13 @@ Stream.prototype.flush = function () {
 };
 
 
-// var stream = new Stream();
+// var stream = new Socket();
 // stream.connect(80)               - TCP connect to port 80 on the localhost
 // stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
 // stream.connect('/tmp/socket')    - UNIX connect to socket specified by path
-Stream.prototype.connect = function () {
+Socket.prototype.connect = function () {
   var self = this;
-  if (self.fd) throw new Error('Stream already opened');
+  if (self.fd) throw new Error('Socket already opened');
 
   if (typeof(arguments[0]) == 'string' && arguments.length == 1) {
     self.fd = process.socket('UNIX');
@@ -304,7 +307,7 @@ Stream.prototype.connect = function () {
 };
 
 
-Stream.prototype.forceClose = function (exception) {
+Socket.prototype.forceClose = function (exception) {
   if (this.fd) {
     this.readable = false;
     this.writable = false;
@@ -314,12 +317,12 @@ Stream.prototype.forceClose = function (exception) {
     close(this.fd);
     debug('close peer ' + this.fd);
     this.fd = null;
-    this.emit('close', exception); 
+    this.emit('close', exception);
   }
 };
 
 
-Stream.prototype._shutdown = function () {
+Socket.prototype._shutdown = function () {
   if (this.writable) {
     this.writable = false;
     shutdown(this.fd, "write");
@@ -327,7 +330,7 @@ Stream.prototype._shutdown = function () {
 };
 
 
-Stream.prototype.close = function () {
+Socket.prototype.close = function () {
   var self = this;
   var closeMethod;
   if (self.readable && self.writable) {
@@ -336,7 +339,7 @@ Stream.prototype.close = function () {
     // already got EOF
     closeMethod = self.forceClose;
   }
-  // In the case we've already shutdown write side, 
+  // In the case we've already shutdown write side,
   // but haven't got EOF: ignore. In the case we're
   // fully closed already: ignore.
 
@@ -358,16 +361,17 @@ function Server (listener) {
     self.addListener('connection', listener);
   }
 
-  self.watcher = new process.IOWatcher(function (readable, writeable) {
+  self.watcher = new IOWatcher();
+  self.watcher.callback = function (readable, writeable) {
     while (self.fd) {
       var peerInfo = accept(self.fd);
       debug('accept: ' + JSON.stringify(peerInfo));
       if (!peerInfo) return;
-      var peer = new Stream(peerInfo);
+      var peer = new Socket(peerInfo);
       self.emit('connection', peer);
     }
-  });
-};
+  };
+}
 process.inherits(Server, process.EventEmitter);
 exports.Server = Server;
 
@@ -411,7 +415,7 @@ Server.prototype.listen = function () {
   listen(self.fd, 128);
   self.emit("listening");
 
-  self.watcher.set(self.fd, true, false); 
+  self.watcher.set(self.fd, true, false);
   self.watcher.start();
 };
 
index 3c60a7b..4a2d937 100644 (file)
@@ -37,7 +37,11 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
   HandleScope scope;
 
   Local<Value> callback_v = io->handle_->Get(callback_symbol);
-  assert(callback_v->IsFunction());
+  if (!callback_v->IsFunction()) {
+    io->Stop();
+    return;
+  }
+
   Local<Function> callback = Local<Function>::Cast(callback_v);
 
   TryCatch try_catch;
@@ -64,19 +68,9 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
 Handle<Value> IOWatcher::New(const Arguments& args) {
   HandleScope scope;
 
-  if (!args[0]->IsFunction()) {
-    return ThrowException(Exception::TypeError(
-          String::New("First arg should a callback.")));
-  }
-
-  Local<Function> callback = Local<Function>::Cast(args[0]);
-
   IOWatcher *s = new IOWatcher();
-
   s->Wrap(args.This());
 
-  s->handle_->Set(callback_symbol, callback);
-
   return args.This();
 }
 
@@ -136,7 +130,6 @@ Handle<Value> IOWatcher::Stop(const Arguments& args) {
 
 void IOWatcher::Stop () {
   if (watcher_.active) {
-    HandleScope scope;
     ev_io_stop(EV_DEFAULT_UC_ &watcher_);
     Unref();
   }
index 420c6de..5e73177 100644 (file)
@@ -20,7 +20,7 @@ class IOWatcher : ObjectWrap {
   }
 
   ~IOWatcher() {
-    Stop();
+    ev_io_stop(EV_DEFAULT_UC_ &watcher_);
   }
 
   static v8::Handle<v8::Value> New(const v8::Arguments& args);