net_uv: Implement end(), destroySoon()
authorRyan Dahl <ry@tinyclouds.org>
Fri, 17 Jun 2011 15:10:12 +0000 (17:10 +0200)
committerRyan Dahl <ry@tinyclouds.org>
Fri, 17 Jun 2011 15:10:12 +0000 (17:10 +0200)
lib/net_uv.js

index d637f0c..9f9441a 100644 (file)
@@ -5,6 +5,11 @@ var util = require('util');
 var assert = require('assert');
 var TCP = process.binding('tcp_wrap').TCP;
 
+/* Bit flags for socket._flags */
+var FLAG_GOT_EOF      = 1 << 0;
+var FLAG_SHUTDOWN     = 1 << 1;
+var FLAG_DESTROY_SOON = 1 << 2;
+
 
 var debug;
 if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) {
@@ -40,7 +45,11 @@ function Socket(options) {
   this._handle.socket = this;
   this._handle.onread = onread;
 
+  this.allowHalfOpen = options ? (options.allowHalfOpen || false) : false;
+
   this._writeRequests = [];
+
+  this._flags = 0;
 }
 util.inherits(Socket, stream.Stream);
 
@@ -68,16 +77,12 @@ Object.defineProperty(Socket.prototype, 'readyState', {
     if (this._connecting) {
       return 'opening';
     } else if (this.readable && this.writable) {
-      assert(typeof this.fd === 'number');
       return 'open';
     } else if (this.readable && !this.writable) {
-      assert(typeof this.fd === 'number');
       return 'readOnly';
     } else if (!this.readable && this.writable) {
-      assert(typeof this.fd === 'number');
       return 'writeOnly';
     } else {
-      assert(typeof this.fd !== 'number');
       return 'closed';
     }
   }
@@ -101,13 +106,42 @@ Socket.prototype.resume = function() {
 };
 
 
-Socket.prototype.end = function() {
-  throw new Error("implement me");
+Socket.prototype.end = function(data, encoding) {
+  if (!this.writable) return;
+  this.writable = false;
+
+  if (data) this.write(data, encoding);
+  DTRACE_NET_STREAM_END(this);
+
+  if (this._flags & FLAG_GOT_EOF) {
+    this.destroySoon();
+  } else {
+    this._flags |= FLAG_SHUTDOWN;
+    var shutdownReq = this._handle.shutdown();
+    shutdownReq.oncomplete = afterShutdown;
+  }
 };
 
 
+function afterShutdown(status, handle, req) {
+  var self = handle.socket;
+
+  assert.ok(self._flags & FLAG_SHUTDOWN);
+
+  if (self._flags & FLAG_GOT_EOF) {
+    self.destroy();
+  } else {
+  }
+}
+
+
 Socket.prototype.destroySoon = function() {
-  throw new Error("implement me");
+  this.writable = false;
+  this._flags |= FLAG_DESTROY_SOON;
+
+  if (this._writeRequests.length == 0) {
+    this.destroy();
+  }
 };
 
 
@@ -167,8 +201,12 @@ function onread(buffer, offset, length) {
     // EOF
     self.readable = false;
 
+    assert.ok(!(self._flags & FLAG_GOT_EOF));
+    self._flags |= FLAG_GOT_EOF;
+
+    // We call destroy() before end(). 'close' not emitted until nextTick so
+    // the 'end' event will come first as required.
     if (!self.writable) self.destroy();
-    // Note: 'close' not emitted until nextTick.
 
     if (!self.allowHalfOpen) self.end();
     if (self._events && self._events['end']) self.emit('end');
@@ -230,7 +268,15 @@ function afterWrite(status, handle, req, buffer) {
   var req_ = self._writeRequests.shift();
   assert.equal(req, req_);
 
+  if (self._writeRequests.length == 0) {
+    self.emit('drain');
+  }
+
   if (req.cb) req.cb();
+
+  if (self._writeRequests.length == 0  && self._flags & FLAG_DESTROY_SOON) {
+    self.destroy();
+  }
 }
 
 
@@ -257,12 +303,13 @@ Socket.prototype.connect = function(port, host) {
       // TODO retrun promise from Socket.prototype.connect which
       // wraps _connectReq.
 
-      assert.ok(!self._connectReq);
+      assert.ok(!self._connecting);
  
-      self._connectReq = self._handle.connect(ip, port);
+      var connectReq = self._handle.connect(ip, port);
 
-      if (self._connectReq) {
-        self._connectReq.oncomplete = afterConnect;
+      if (connectReq) {
+        self._connecting = true;
+        connectReq.oncomplete = afterConnect;
       } else {
         self.destroy(errnoException(errno, 'connect'));
       }
@@ -275,6 +322,9 @@ function afterConnect(status, handle, req) {
   var self = handle.socket;
   assert.equal(handle, self._handle);
 
+  assert.ok(self._connecting);
+  self._connecting = false;
+
   if (status == 0) {
     self.readable = self.writable = true;
     timers.active(self);
@@ -320,7 +370,7 @@ function Server(/* [ options, ] listener */) {
 
   this.on('connection', listenerCallback);
   this.connections = 0;
-  self.allowHalfOpen = options.allowHalfOpen || false;
+  this.allowHalfOpen = options.allowHalfOpen || false;
 
 
   this._handle = new TCP();
@@ -376,7 +426,10 @@ function onconnection(clientHandle) {
   var handle = this;
   var self = handle.socket;
 
-  var socket = new Socket({ handle: clientHandle });
+  var socket = new Socket({
+    handle: clientHandle,
+    allowHalfOpen: self.allowHalfOpen
+  });
   socket.readable = socket.writable = true;
   socket.resume();
 
@@ -387,3 +440,7 @@ function onconnection(clientHandle) {
   self.emit('connection', socket);
 }
 
+
+Server.prototype.close = function() {
+  this._handle.close();
+};