http: Refactor for streams2
authorisaacs <i@izs.me>
Thu, 13 Dec 2012 06:24:17 +0000 (22:24 -0800)
committerisaacs <i@izs.me>
Sat, 15 Dec 2012 01:46:23 +0000 (17:46 -0800)
Because of some of the peculiarities of http, this has a bit of special
magic to handle cases where the IncomingMessage would wait forever in a
paused state.

In the server, if you do not begin consuming the request body by the
time the response emits 'finish', then it will be flushed out.

In the client, if you do not add a 'response' handler onto the request,
then the response stream will be flushed out.

lib/http.js

index 62b2ffe..0555e69 100644 (file)
@@ -114,19 +114,30 @@ function parserOnHeadersComplete(info) {
   return skipBody;
 }
 
+// XXX This is a mess.
+// TODO: http.Parser should be a Writable emits request/response events.
 function parserOnBody(b, start, len) {
   var parser = this;
-  var slice = b.slice(start, start + len);
-  if (parser.incoming._paused || parser.incoming._pendings.length) {
-    parser.incoming._pendings.push(slice);
-  } else {
-    parser.incoming._emitData(slice);
+  var stream = parser.incoming;
+  var rs = stream._readableState;
+  var socket = stream.socket;
+
+  // pretend this was the result of a stream._read call.
+  if (len > 0) {
+    var slice = b.slice(start, start + len);
+    rs.onread(null, slice);
   }
+
+  if (rs.length >= rs.highWaterMark)
+    socket.pause();
 }
 
 function parserOnMessageComplete() {
   var parser = this;
-  parser.incoming.complete = true;
+  var stream = parser.incoming;
+  var socket = stream.socket;
+
+  stream.complete = true;
 
   // Emit any trailing headers.
   var headers = parser._headers;
@@ -140,19 +151,13 @@ function parserOnMessageComplete() {
     parser._url = '';
   }
 
-  if (!parser.incoming.upgrade) {
+  if (!stream.upgrade)
     // For upgraded connections, also emit this after parser.execute
-    if (parser.incoming._paused || parser.incoming._pendings.length) {
-      parser.incoming._pendings.push(END_OF_FILE);
-    } else {
-      parser.incoming.readable = false;
-      parser.incoming._emitEnd();
-    }
-  }
+    stream._readableState.onread(null, null);
 
   if (parser.socket.readable) {
     // force to read the next incoming message
-    parser.socket.resume();
+    socket.resume();
   }
 }
 
@@ -263,9 +268,13 @@ function utcDate() {
 
 /* Abstract base class for ServerRequest and ClientResponse. */
 function IncomingMessage(socket) {
-  Stream.call(this);
+  Stream.Readable.call(this);
+
+  // XXX This implementation is kind of all over the place
+  // When the parser emits body chunks, they go in this list.
+  // _read() pulls them out, and when it finds EOF, it ends.
+  this._pendings = [];
 
-  // TODO Remove one of these eventually.
   this.socket = socket;
   this.connection = socket;
 
@@ -276,77 +285,49 @@ function IncomingMessage(socket) {
 
   this.readable = true;
 
-  this._paused = false;
   this._pendings = [];
-
-  this._endEmitted = false;
+  this._pendingIndex = 0;
 
   // request (server) only
   this.url = '';
-
   this.method = null;
 
   // response (client) only
   this.statusCode = null;
   this.client = this.socket;
+
+  // flag for backwards compatibility grossness.
+  this._consuming = false;
 }
-util.inherits(IncomingMessage, Stream);
+util.inherits(IncomingMessage, Stream.Readable);
 
 
 exports.IncomingMessage = IncomingMessage;
 
 
-IncomingMessage.prototype.destroy = function(error) {
-  this.socket.destroy(error);
+IncomingMessage.prototype.read = function(n) {
+  this._consuming = true;
+  return Stream.Readable.prototype.read.call(this, n);
 };
 
 
-IncomingMessage.prototype.setEncoding = function(encoding) {
-  var StringDecoder = require('string_decoder').StringDecoder; // lazy load
-  this._decoder = new StringDecoder(encoding);
-};
-
-
-IncomingMessage.prototype.pause = function() {
-  this._paused = true;
-  this.socket.pause();
+IncomingMessage.prototype._read = function(n, callback) {
+  // We actually do almost nothing here, because the parserOnBody
+  // function fills up our internal buffer directly.  However, we
+  // do need to unpause the underlying socket so that it flows.
+  if (!this.socket.readable)
+    return callback(null, null);
+  else
+    this.socket.resume();
 };
 
 
-IncomingMessage.prototype.resume = function() {
-  this._paused = false;
-  if (this.socket) {
-    this.socket.resume();
-  }
-
-  this._emitPending();
+IncomingMessage.prototype.destroy = function(error) {
+  this.socket.destroy(error);
 };
 
 
-IncomingMessage.prototype._emitPending = function(callback) {
-  if (this._pendings.length) {
-    var self = this;
-    process.nextTick(function() {
-      while (!self._paused && self._pendings.length) {
-        var chunk = self._pendings.shift();
-        if (chunk !== END_OF_FILE) {
-          assert(Buffer.isBuffer(chunk));
-          self._emitData(chunk);
-        } else {
-          assert(self._pendings.length === 0);
-          self.readable = false;
-          self._emitEnd();
-        }
-      }
 
-      if (callback) {
-        callback();
-      }
-    });
-  } else if (callback) {
-    callback();
-  }
-};
 
 
 IncomingMessage.prototype._emitData = function(d) {
@@ -1016,7 +997,7 @@ ServerResponse.prototype.writeHead = function(statusCode) {
 
   // don't keep alive connections where the client expects 100 Continue
   // but we sent a final status; they may put extra bytes on the wire.
-  if (this._expect_continue && ! this._sent100) {
+  if (this._expect_continue && !this._sent100) {
     this.shouldKeepAlive = false;
   }
 
@@ -1321,11 +1302,10 @@ function socketCloseListener() {
     // Socket closed before we emitted 'end' below.
     req.res.emit('aborted');
     var res = req.res;
-    req.res._emitPending(function() {
-      res._emitEnd();
+    res.on('end', function() {
       res.emit('close');
-      res = null;
     });
+    res._readableState.onread(null, null);
   } else if (!req.res && !req._hadError) {
     // This socket error fired before we started to
     // receive a response. The error needs to
@@ -1428,11 +1408,13 @@ function socketOnData(d, start, end) {
 }
 
 
+// client
 function parserOnIncomingClient(res, shouldKeepAlive) {
   var parser = this;
   var socket = this.socket;
   var req = socket._httpMessage;
 
+
   // propogate "domain" setting...
   if (req.domain && !res.domain) {
     debug('setting "res.domain"');
@@ -1480,15 +1462,21 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
 
   DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
   COUNTER_HTTP_CLIENT_RESPONSE();
-  req.emit('response', res);
   req.res = res;
   res.req = req;
-
+  var handled = req.emit('response', res);
   res.on('end', responseOnEnd);
 
+  // If the user did not listen for the 'response' event, then they
+  // can't possibly read the data, so we .resume() it into the void
+  // so that the socket doesn't hang there in a paused state.
+  if (!handled)
+    res.resume();
+
   return isHeadResponse;
 }
 
+// client
 function responseOnEnd() {
   var res = this;
   var req = res.req;
@@ -1784,7 +1772,7 @@ function connectionListener(socket) {
     incoming.push(req);
 
     var res = new ServerResponse(req);
-    debug('server response shouldKeepAlive: ' + shouldKeepAlive);
+
     res.shouldKeepAlive = shouldKeepAlive;
     DTRACE_HTTP_SERVER_REQUEST(req, socket);
     COUNTER_HTTP_SERVER_REQUEST();
@@ -1806,6 +1794,12 @@ function connectionListener(socket) {
 
       incoming.shift();
 
+      // if the user never called req.read(), and didn't pipe() or
+      // .resume() or .on('data'), then we call req.resume() so that the
+      // bytes will be pulled off the wire.
+      if (!req._consuming)
+        req.resume();
+
       res.detachSocket(socket);
 
       if (res._last) {