[net2] Better EOF marking, rename events
authorRyan Dahl <ry@tinyclouds.org>
Mon, 28 Dec 2009 15:42:48 +0000 (16:42 +0100)
committerRyan Dahl <ry@tinyclouds.org>
Tue, 29 Dec 2009 20:12:32 +0000 (21:12 +0100)
lib/net.js
src/node_net2.cc
test-net-server.js

index 3108fee..1b1d285 100644 (file)
@@ -24,6 +24,7 @@ var getsockname = process.getsockname;
 var getaddrinfo = process.getaddrinfo;
 var needsLookup = process.needsLookup;
 var EINPROGRESS = process.EINPROGRESS;
+var END_OF_FILE = 42;
 
 
 function Socket (peerInfo) {
@@ -59,7 +60,7 @@ function Socket (peerInfo) {
       var slice = self.recvBuffer.slice(self.recvBuffer.used,
                                         self.recvBuffer.used + bytesRead);
       self.recvBuffer.used += bytesRead;
-      self.emit('receive', slice);
+      self.emit('data', slice);
     }
   };
   self.readable = false;
@@ -153,7 +154,7 @@ Socket.prototype._sendString = function (data, encoding) {
     }
     // if we didn't find one, take the last
     if (!buffer) {
-      buffer = self.sendQueue[self.sendQueue.length-1];
+      buffer = self._sendQueueLast();
       // if last buffer is used up
       if (buffer.length == buffer.used) buffer = self._allocateSendBuffer();
     }
@@ -191,12 +192,24 @@ Socket.prototype._sendString = function (data, encoding) {
 };
 
 
+Socket.prototype._sendQueueLast = function () {
+  return this.sendQueue.length > 0 ? this.sendQueue[this.sendQueue.length-1]
+                                   : null;
+};
+
+
 // Returns true if all the data was flushed to socket. Returns false if
 // something was queued. If data was queued, then the "drain" event will
 // signal when it has been finally flushed to socket.
 Socket.prototype.send = function (data, encoding) {
   var self = this;
+
   if (!self.writable) throw new Error('Socket is not writable');
+
+  if (self._sendQueueLast == END_OF_FILE) {
+    throw new Error('socket.close() called already; cannot write.');
+  }
+
   if (typeof(data) == 'string') {
     self._sendString(data, encoding);
   } else {
@@ -225,12 +238,18 @@ Socket.prototype.send = function (data, encoding) {
 // Flushes the write buffer out. Emits "drain" if the buffer is empty.
 Socket.prototype.flush = function () {
   var self = this;
-  if (!self.writable) throw new Error('Socket is not writable');
 
   var bytesWritten;
   while (self.sendQueue.length > 0) {
+    if (!self.writable) throw new Error('Socket is not writable');
+
     var b = self.sendQueue[0];
 
+    if (b == END_OF_FILE) {
+      self._shutdown();
+      break;
+    }
+
     if (b.sent == b.used) {
       // this can be improved - save the buffer for later?
       self.sendQueue.shift()
@@ -315,7 +334,7 @@ Socket.prototype.forceClose = function (exception) {
     this.writeWatcher.stop();
     this.readWatcher.stop();
     close(this.fd);
-    debug('close peer ' + this.fd);
+    debug('close socket ' + this.fd);
     this.fd = null;
     this.emit('close', exception);
   }
@@ -325,30 +344,16 @@ Socket.prototype.forceClose = function (exception) {
 Socket.prototype._shutdown = function () {
   if (this.writable) {
     this.writable = false;
-    shutdown(this.fd, "write");
+    shutdown(this.fd, 'write');
   }
 };
 
 
 Socket.prototype.close = function () {
-  var self = this;
-  var closeMethod;
-  if (self.readable && self.writable) {
-    closeMethod = self._shutdown;
-  } else if (!self.readable && self.writable) {
-    // already got EOF
-    closeMethod = self.forceClose;
-  }
-  // In the case we've already shutdown write side,
-  // but haven't got EOF: ignore. In the case we're
-  // fully closed already: ignore.
-
-  if (closeMethod) {
-    if (self.sendQueueSize == 0) {
-      // no queue. just shut down the socket.
-      closeMethod();
-    } else {
-      self.addListener("drain", closeMethod);
+  if (this.writable) {
+    if (this._sendQueueLast() != END_OF_FILE) {
+      this.sendQueue.push(END_OF_FILE);
+      this.flush();
     }
   }
 };
index 3457cdb..1cebd86 100644 (file)
@@ -264,7 +264,7 @@ static Handle<Value> Shutdown(const Arguments& args) {
   int how = SHUT_WR;
 
   if (args[1]->IsString()) {
-    String::Utf8Value t(args[0]->ToString());
+    String::Utf8Value t(args[1]->ToString());
     if (0 == strcasecmp(*t, "write")) {
       how = SHUT_WR;
     } else if (0 == strcasecmp(*t, "read")) {
index 6968eae..76f1057 100644 (file)
@@ -5,27 +5,27 @@ process.Buffer.prototype.toString = function () {
 var sys = require("sys");
 var net = require("./lib/net");
 
-var server = new net.Server(function (stream) {
-  sys.puts("connection (" + stream.fd + "): " 
-          + stream.remoteAddress 
+var server = new net.Server(function (socket) {
+  sys.puts("connection (" + socket.fd + "): " 
+          + socket.remoteAddress 
           + " port " 
-          + stream.remotePort
+          + socket.remotePort
           );
   sys.puts("server fd: " + server.fd);
 
-  stream.addListener("receive", function (b) {
-    stream.send("pong ascii\r\n", "ascii");
-    stream.send(b);
-    stream.send("pong utf8\r\n", "utf8");
+  socket.addListener("data", function (b) {
+    socket.send("pong ascii\r\n", "ascii");
+    socket.send(b);
+    socket.send("pong utf8\r\n", "utf8");
   });
 
-  stream.addListener('drain', function () {
-    sys.puts("server-side socket drain");
+  socket.addListener("eof", function () {
+    sys.puts("server peer eof");
+    socket.close();
   });
 
-  stream.addListener("eof", function () {
-    sys.puts("server peer eof");
-    stream.close();
+  socket.addListener('drain', function () {
+    sys.puts("server-side socket drain");
   });
 });
 server.listen(8000);