net: Refactor to use streams2
authorisaacs <i@izs.me>
Thu, 13 Dec 2012 05:18:57 +0000 (21:18 -0800)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 18:52:30 +0000 (10:52 -0800)
This is a combination of 6 commits.

* XXX net fixup lcase stream

* net: Refactor to use streams2

    Use 'socket.resume()' in many tests to trigger old-mode behavior.

* net: Call destroy() if shutdown() is not provided

    This is important for TTY wrap streams

* net: Call .end() in socket.destroySoon if necessary

    This makes the http 1.0 keepAlive test pass, also.

* net wtf-ish stuff kinda busted

* net fixup

lib/net.js

index 81d02a5..d0a2c5a 100644 (file)
@@ -20,7 +20,7 @@
 // USE OR OTHER DEALINGS IN THE SOFTWARE.
 
 var events = require('events');
-var Stream = require('stream');
+var stream = require('stream');
 var timers = require('timers');
 var util = require('util');
 var assert = require('assert');
@@ -42,16 +42,16 @@ function createTCP() {
 }
 
 
-/* Bit flags for socket._flags */
-var FLAG_GOT_EOF = 1 << 0;
-var FLAG_SHUTDOWN = 1 << 1;
-var FLAG_DESTROY_SOON = 1 << 2;
-var FLAG_SHUTDOWN_QUEUED = 1 << 3;
-
-
 var debug;
 if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) {
-  debug = function(x) { console.error('NET:', x); };
+  var pid = process.pid;
+  debug = function(x) {
+    // if console is not set up yet, then skip this.
+    if (!console.error)
+      return;
+    console.error('NET: %d', pid,
+                  util.format.apply(util, arguments).slice(0, 500));
+  };
 } else {
   debug = function() { };
 }
@@ -110,12 +110,8 @@ function normalizeConnectArgs(args) {
 exports._normalizeConnectArgs = normalizeConnectArgs;
 
 
-/* called when creating new Socket, or when re-using a closed Socket */
+// called when creating new Socket, or when re-using a closed Socket
 function initSocketHandle(self) {
-  self._pendingWriteReqs = 0;
-
-  self._flags = 0;
-  self._connectQueueSize = 0;
   self.destroyed = false;
   self.errorEmitted = false;
   self.bytesRead = 0;
@@ -131,8 +127,6 @@ function initSocketHandle(self) {
 function Socket(options) {
   if (!(this instanceof Socket)) return new Socket(options);
 
-  Stream.call(this);
-
   switch (typeof options) {
     case 'number':
       options = { fd: options }; // Legacy interface.
@@ -142,7 +136,10 @@ function Socket(options) {
       break;
   }
 
-  if (typeof options.fd === 'undefined') {
+  this.readable = this.writable = false;
+  if (options.handle) {
+    this._handle = options.handle; // private
+  } else if (typeof options.fd === 'undefined') {
     this._handle = options && options.handle; // private
   } else {
     this._handle = createPipe();
@@ -150,17 +147,105 @@ function Socket(options) {
     this.readable = this.writable = true;
   }
 
+  this.onend = null;
+
+  // shut down the socket when we're finished with it.
+  this.on('finish', onSocketFinish);
+  this.on('_socketEnd', onSocketEnd);
+
   initSocketHandle(this);
-  this.allowHalfOpen = options && options.allowHalfOpen;
+
+  this._pendingWrite = null;
+
+  stream.Duplex.call(this, options);
+
+  // handle strings directly
+  this._writableState.decodeStrings = false;
+
+  // default to *not* allowing half open sockets
+  this.allowHalfOpen = options && options.allowHalfOpen || false;
+
+  // if we have a handle, then start the flow of data into the
+  // buffer.  if not, then this will happen when we connect
+  if (this._handle && (!options || options.readable !== false))
+    this.read(0);
+}
+util.inherits(Socket, stream.Duplex);
+
+// the user has called .end(), and all the bytes have been
+// sent out to the other side.
+// If allowHalfOpen is false, or if the readable side has
+// ended already, then destroy.
+// If allowHalfOpen is true, then we need to do a shutdown,
+// so that only the writable side will be cleaned up.
+function onSocketFinish() {
+  debug('onSocketFinish');
+  if (this._readableState.ended) {
+    debug('oSF: ended, destroy', this._readableState);
+    return this.destroy();
+  }
+
+  debug('oSF: not ended, call shutdown()');
+
+  // otherwise, just shutdown, or destroy() if not possible
+  if (!this._handle.shutdown)
+    return this.destroy();
+
+  var shutdownReq = this._handle.shutdown();
+
+  if (!shutdownReq)
+    return this._destroy(errnoException(errno, 'shutdown'));
+
+  shutdownReq.oncomplete = afterShutdown;
+}
+
+
+function afterShutdown(status, handle, req) {
+  var self = handle.owner;
+
+  debug('afterShutdown destroyed=%j', self.destroyed,
+        self._readableState);
+
+  // callback may come after call to destroy.
+  if (self.destroyed)
+    return;
+
+  if (self._readableState.ended) {
+    debug('readableState ended, destroying');
+    self.destroy();
+  } else {
+    self.once('_socketEnd', self.destroy);
+  }
 }
-util.inherits(Socket, Stream);
 
+// the EOF has been received, and no more bytes are coming.
+// if the writable side has ended already, then clean everything
+// up.
+function onSocketEnd() {
+  // XXX Should not have to do as much crap in this function.
+  // ended should already be true, since this is called *after*
+  // the EOF errno and onread has returned null to the _read cb.
+  debug('onSocketEnd', this._readableState);
+  this._readableState.ended = true;
+  if (this._readableState.endEmitted) {
+    this.readable = false;
+  } else {
+    this.once('end', function() {
+      this.readable = false;
+    });
+    this.read(0);
+  }
+
+  if (!this.allowHalfOpen)
+    this.destroySoon();
+}
 
 exports.Socket = Socket;
 exports.Stream = Socket; // Legacy naming.
 
 
 Socket.prototype.listen = function() {
+  debug('socket.listen');
   var self = this;
   self.on('connection', arguments[0]);
   listen(self, null, null, null);
@@ -230,96 +315,62 @@ Object.defineProperty(Socket.prototype, 'readyState', {
 Object.defineProperty(Socket.prototype, 'bufferSize', {
   get: function() {
     if (this._handle) {
-      return this._handle.writeQueueSize + this._connectQueueSize;
+      return this._handle.writeQueueSize;
     }
   }
 });
 
 
-Socket.prototype.pause = function() {
-  this._paused = true;
-  if (this._handle && !this._connecting) {
-    this._handle.readStop();
+// Just call handle.readStart until we have enough in the buffer
+Socket.prototype._read = function(n, callback) {
+  debug('_read');
+  if (this._connecting || !this._handle) {
+    debug('_read wait for connection');
+    this.once('connect', this._read.bind(this, n, callback));
+    return;
   }
-};
 
+  assert(callback === this._readableState.onread);
+  assert(this._readableState.reading = true);
 
-Socket.prototype.resume = function() {
-  this._paused = false;
-  if (this._handle && !this._connecting) {
-    this._handle.readStart();
+  if (!this._handle.reading) {
+    debug('Socket._read readStart');
+    this._handle.reading = true;
+    var r = this._handle.readStart();
+    if (r)
+      this._destroy(errnoException(errno, 'read'));
+  } else {
+    debug('readStart already has been called.');
   }
 };
 
 
 Socket.prototype.end = function(data, encoding) {
-  if (this._connecting && ((this._flags & FLAG_SHUTDOWN_QUEUED) == 0)) {
-    // still connecting, add data to buffer
-    if (data) this.write(data, encoding);
-    this.writable = false;
-    this._flags |= FLAG_SHUTDOWN_QUEUED;
-  }
-
-  if (!this.writable) return;
+  stream.Duplex.prototype.end.call(this, data, encoding);
   this.writable = false;
-
-  if (data) this.write(data, encoding);
   DTRACE_NET_STREAM_END(this);
 
-  if (!this.readable) {
-    this.destroySoon();
-  } else {
-    this._flags |= FLAG_SHUTDOWN;
-    var shutdownReq = this._handle.shutdown();
-
-    if (!shutdownReq) {
-      this._destroy(errnoException(errno, 'shutdown'));
-      return false;
-    }
-
-    shutdownReq.oncomplete = afterShutdown;
-  }
-
-  return true;
+  // just in case we're waiting for an EOF.
+  if (!this._readableState.endEmitted)
+    this.read(0);
+  return;
 };
 
 
-function afterShutdown(status, handle, req) {
-  var self = handle.owner;
-
-  assert.ok(self._flags & FLAG_SHUTDOWN);
-  assert.ok(!self.writable);
-
-  // callback may come after call to destroy.
-  if (self.destroyed) {
-    return;
-  }
-
-  if (self._flags & FLAG_GOT_EOF || !self.readable) {
-    self._destroy();
-  } else {
-  }
-}
-
-
 Socket.prototype.destroySoon = function() {
-  this.writable = false;
-  this._flags |= FLAG_DESTROY_SOON;
-
-  if (this._pendingWriteReqs == 0) {
-    this._destroy();
-  }
-};
-
+  if (this.writable)
+    this.end();
 
-Socket.prototype._connectQueueCleanUp = function(exception) {
-  this._connecting = false;
-  this._connectQueueSize = 0;
-  this._connectQueue = null;
+  if (this._writableState.finishing || this._writableState.finished)
+    this.destroy();
+  else
+    this.once('finish', this.destroy);
 };
 
 
 Socket.prototype._destroy = function(exception, cb) {
+  debug('destroy');
+
   var self = this;
 
   function fireErrorCallbacks() {
@@ -333,13 +384,12 @@ Socket.prototype._destroy = function(exception, cb) {
   };
 
   if (this.destroyed) {
+    debug('already destroyed, fire error callbacks');
     fireErrorCallbacks();
     return;
   }
 
-  self._connectQueueCleanUp();
-
-  debug('destroy');
+  self._connecting = false;
 
   this.readable = this.writable = false;
 
@@ -347,6 +397,8 @@ Socket.prototype._destroy = function(exception, cb) {
 
   debug('close');
   if (this._handle) {
+    if (this !== process.stderr)
+      debug('close handle');
     this._handle.close();
     this._handle.onread = noop;
     this._handle = null;
@@ -355,6 +407,7 @@ Socket.prototype._destroy = function(exception, cb) {
   fireErrorCallbacks();
 
   process.nextTick(function() {
+    debug('emit close');
     self.emit('close', exception ? true : false);
   });
 
@@ -362,6 +415,7 @@ Socket.prototype._destroy = function(exception, cb) {
 
   if (this.server) {
     COUNTER_NET_SERVER_CONNECTION_CLOSE(this);
+    debug('has server');
     this.server._connections--;
     if (this.server._emitCloseIfDrained) {
       this.server._emitCloseIfDrained();
@@ -371,10 +425,13 @@ Socket.prototype._destroy = function(exception, cb) {
 
 
 Socket.prototype.destroy = function(exception) {
+  debug('destroy', exception);
   this._destroy(exception);
 };
 
 
+// This function is called whenever the handle gets a
+// buffer, or when there's an error reading.
 function onread(buffer, offset, length) {
   var handle = this;
   var self = handle.owner;
@@ -383,47 +440,56 @@ function onread(buffer, offset, length) {
   timers.active(self);
 
   var end = offset + length;
+  debug('onread', global.errno, offset, length, end);
 
   if (buffer) {
-    // Emit 'data' event.
+    debug('got data');
 
-    if (self._decoder) {
-      // Emit a string.
-      var string = self._decoder.write(buffer.slice(offset, end));
-      if (string.length) self.emit('data', string);
-    } else {
-      // Emit a slice. Attempt to avoid slicing the buffer if no one is
-      // listening for 'data'.
-      if (self._events && self._events['data']) {
-        self.emit('data', buffer.slice(offset, end));
-      }
+    // read success.
+    // In theory (and in practice) calling readStop right now
+    // will prevent this from being called again until _read() gets
+    // called again.
+
+    // if we didn't get any bytes, that doesn't necessarily mean EOF.
+    // wait for the next one.
+    if (offset === end) {
+      debug('not any data, keep waiting');
+      return;
     }
 
+    // if it's not enough data, we'll just call handle.readStart()
+    // again right away.
     self.bytesRead += length;
+    self._readableState.onread(null, buffer.slice(offset, end));
+
+    if (handle.reading && !self._readableState.reading) {
+      handle.reading = false;
+      debug('readStop');
+      var r = handle.readStop();
+      if (r)
+        self._destroy(errnoException(errno, 'read'));
+    }
 
     // Optimization: emit the original buffer with end points
     if (self.ondata) self.ondata(buffer, offset, end);
 
   } else if (errno == 'EOF') {
-    // EOF
-    self.readable = false;
+    debug('EOF');
 
-    assert.ok(!(self._flags & FLAG_GOT_EOF));
-    self._flags |= FLAG_GOT_EOF;
+    if (self._readableState.length === 0)
+      self.readable = false;
 
-    // We call destroy() before end(). 'close' not emitted until nextTick so
-    // the 'end' event will come first as required.
-    if (!self.writable) self._destroy();
+    if (self.onend) self.once('end', self.onend);
 
-    if (!self.allowHalfOpen) self.end();
-    if (self._decoder) {
-      var ret = self._decoder.end();
-      if (ret)
-        self.emit('data', ret);
-    }
-    if (self._events && self._events['end']) self.emit('end');
-    if (self.onend) self.onend();
+    // send a null to the _read cb to signal the end of data.
+    self._readableState.onread(null, null);
+
+    // internal end event so that we know that the actual socket
+    // is no longer readable, and we can start the shutdown
+    // procedure. No need to wait for all the data to be consumed.
+    self.emit('_socketEnd');
   } else {
+    debug('error', errno);
     // Error
     if (errno == 'ECONNRESET') {
       self._destroy();
@@ -434,12 +500,6 @@ function onread(buffer, offset, length) {
 }
 
 
-Socket.prototype.setEncoding = function(encoding) {
-  var StringDecoder = require('string_decoder').StringDecoder; // lazy load
-  this._decoder = new StringDecoder(encoding);
-};
-
-
 Socket.prototype._getpeername = function() {
   if (!this._handle || !this._handle.getpeername) {
     return {};
@@ -465,63 +525,39 @@ Socket.prototype.__defineGetter__('remotePort', function() {
 });
 
 
-/*
- * Arguments data, [encoding], [cb]
- */
-Socket.prototype.write = function(data, arg1, arg2) {
-  var encoding, cb;
+Socket.prototype.write = function(chunk, encoding, cb) {
+  if (typeof chunk !== 'string' && !Buffer.isBuffer(chunk))
+    throw new TypeError('invalid data');
+  return stream.Duplex.prototype.write.apply(this, arguments);
+};
 
-  // parse arguments
-  if (arg1) {
-    if (typeof arg1 === 'string') {
-      encoding = arg1;
-      cb = arg2;
-    } else if (typeof arg1 === 'function') {
-      cb = arg1;
-    } else {
-      throw new Error('bad arg');
-    }
-  }
 
-  if (typeof data === 'string') {
-    encoding = (encoding || 'utf8').toLowerCase();
-    switch (encoding) {
-      case 'utf8':
-      case 'utf-8':
-      case 'ascii':
-      case 'ucs2':
-      case 'ucs-2':
-      case 'utf16le':
-      case 'utf-16le':
-        // This encoding can be handled in the binding layer.
-        break;
+Socket.prototype._write = function(dataEncoding, cb) {
+  assert(Array.isArray(dataEncoding));
+  var data = dataEncoding[0];
+  var encoding = dataEncoding[1] || 'utf8';
 
-      default:
-        data = new Buffer(data, encoding);
-    }
-  } else if (!Buffer.isBuffer(data)) {
-    throw new TypeError('First argument must be a buffer or a string.');
-  }
+  if (this !== process.stderr && this !== process.stdout)
+    debug('Socket._write');
 
   // If we are still connecting, then buffer this for later.
+  // The Writable logic will buffer up any more writes while
+  // waiting for this one to be done.
   if (this._connecting) {
-    this._connectQueueSize += data.length;
-    if (this._connectQueue) {
-      this._connectQueue.push([data, encoding, cb]);
-    } else {
-      this._connectQueue = [[data, encoding, cb]];
-    }
-    return false;
+    debug('_write: waiting for connection');
+    this._pendingWrite = dataEncoding;
+    this.once('connect', function() {
+      debug('_write: connected now, try again');
+      this._write(dataEncoding, cb);
+    });
+    return;
   }
+  this._pendingWrite = null;
 
-  return this._write(data, encoding, cb);
-};
-
-
-Socket.prototype._write = function(data, encoding, cb) {
   timers.active(this);
 
   if (!this._handle) {
+    debug('already destroyed');
     this._destroy(new Error('This socket is closed.'), cb);
     return false;
   }
@@ -550,39 +586,32 @@ Socket.prototype._write = function(data, encoding, cb) {
         break;
 
       default:
-        assert(0);
+        writeReq = this._handle.writeBuffer(new Buffer(data, encoding));
+        break;
     }
   }
 
-  if (!writeReq || typeof writeReq !== 'object') {
-    this._destroy(errnoException(errno, 'write'), cb);
-    return false;
-  }
+  if (!writeReq || typeof writeReq !== 'object')
+    return this._destroy(errnoException(errno, 'write'), cb);
 
   writeReq.oncomplete = afterWrite;
   writeReq.cb = cb;
 
-  this._pendingWriteReqs++;
   this._bytesDispatched += writeReq.bytes;
-
-  return this._handle.writeQueueSize == 0;
 };
 
 
 Socket.prototype.__defineGetter__('bytesWritten', function() {
   var bytes = this._bytesDispatched,
-      connectQueue = this._connectQueue;
+      state = this._writableState,
+      pending = this._pendingWrite;
 
-  if (connectQueue) {
-    connectQueue.forEach(function(el) {
-      var data = el[0];
-      if (Buffer.isBuffer(data)) {
-        bytes += data.length;
-      } else {
-        bytes += Buffer.byteLength(data, el[1]);
-      }
-    }, this);
-  }
+  state.buffer.forEach(function(el) {
+    bytes += Buffer.byteLength(el[0], el[1]);
+  });
+
+  if (pending)
+    bytes += Buffer.byteLength(pending[0], pending[1]);
 
   return bytes;
 });
@@ -590,30 +619,28 @@ Socket.prototype.__defineGetter__('bytesWritten', function() {
 
 function afterWrite(status, handle, req) {
   var self = handle.owner;
+  var state = self._writableState;
+  if (self !== process.stderr && self !== process.stdout)
+    debug('afterWrite', status, req);
 
   // callback may come after call to destroy.
   if (self.destroyed) {
+    debug('afterWrite destroyed');
     return;
   }
 
   if (status) {
+    debug('write failure', errnoException(errno, 'write'));
     self._destroy(errnoException(errno, 'write'), req.cb);
     return;
   }
 
   timers.active(self);
 
-  self._pendingWriteReqs--;
-
-  if (self._pendingWriteReqs == 0) {
-    self.emit('drain');
-  }
+  if (self !== process.stderr && self !== process.stdout)
+    debug('afterWrite call cb');
 
-  if (req.cb) req.cb();
-
-  if (self._pendingWriteReqs == 0 && self._flags & FLAG_DESTROY_SOON) {
-    self._destroy();
-  }
+  req.cb.call(self);
 }
 
 
@@ -663,10 +690,21 @@ Socket.prototype.connect = function(options, cb) {
     return Socket.prototype.connect.apply(this, args);
   }
 
+  if (this.destroyed) {
+    this._readableState.reading = false;
+    this._readableState.ended = false;
+    this._writableState.ended = false;
+    this._writableState.ending = false;
+    this._writableState.finished = false;
+    this._writableState.finishing = false;
+    this.destroyed = false;
+    this._handle = null;
+  }
+
   var self = this;
   var pipe = !!options.path;
 
-  if (this.destroyed || !this._handle) {
+  if (!this._handle) {
     this._handle = pipe ? createPipe() : createTCP();
     initSocketHandle(this);
   }
@@ -755,28 +793,15 @@ function afterConnect(status, handle, req, readable, writable) {
     self.writable = writable;
     timers.active(self);
 
-    if (self.readable && !self._paused) {
-      handle.readStart();
-    }
-
-    if (self._connectQueue) {
-      debug('Drain the connect queue');
-      var connectQueue = self._connectQueue;
-      for (var i = 0; i < connectQueue.length; i++) {
-        self._write.apply(self, connectQueue[i]);
-      }
-      self._connectQueueCleanUp();
-    }
-
     self.emit('connect');
 
-    if (self._flags & FLAG_SHUTDOWN_QUEUED) {
-      // end called before connected - call end now with no data
-      self._flags &= ~FLAG_SHUTDOWN_QUEUED;
-      self.end();
-    }
+    // start the first read, or get an immediate EOF.
+    // this doesn't actually consume any bytes, because len=0.
+    if (readable)
+      self.read(0);
+
   } else {
-    self._connectQueueCleanUp();
+    self._connecting = false;
     self._destroy(errnoException(errno, 'connect'));
   }
 }
@@ -831,9 +856,9 @@ function Server(/* [ options, ] listener */) {
     configurable: true, enumerable: true
   });
 
-  this.allowHalfOpen = options.allowHalfOpen || false;
-
   this._handle = null;
+
+  this.allowHalfOpen = options.allowHalfOpen || false;
 }
 util.inherits(Server, events.EventEmitter);
 exports.Server = Server;
@@ -901,12 +926,14 @@ var createServerHandle = exports._createServerHandle =
 
 
 Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
+  debug('listen2', address, port, addressType, backlog);
   var self = this;
   var r = 0;
 
   // If there is not yet a handle, we need to create one and bind.
   // In the case of a server sent via IPC, we don't need to do this.
   if (!self._handle) {
+    debug('_listen2: create a handle');
     self._handle = createServerHandle(address, port, addressType, fd);
     if (!self._handle) {
       var error = errnoException(errno, 'listen');
@@ -915,6 +942,8 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
       });
       return;
     }
+  } else {
+    debug('_listen2: have a handle already');
   }
 
   self._handle.onconnection = onconnection;
@@ -1049,7 +1078,6 @@ function onconnection(clientHandle) {
   });
   socket.readable = socket.writable = true;
 
-  clientHandle.readStart();
 
   self._connections++;
   socket.server = self;
@@ -1086,11 +1114,17 @@ Server.prototype.close = function(cb) {
 };
 
 Server.prototype._emitCloseIfDrained = function() {
+  debug('SERVER _emitCloseIfDrained');
   var self = this;
 
-  if (self._handle || self._connections) return;
+  if (self._handle || self._connections) {
+    debug('SERVER handle? %j   connections? %d',
+          !!self._handle, self._connections);
+    return;
+  }
 
   process.nextTick(function() {
+    debug('SERVER: emit close');
     self.emit('close');
   });
 };