http: Use streams3 directly, not .ondata/end
authorisaacs <i@izs.me>
Fri, 26 Jul 2013 02:33:15 +0000 (19:33 -0700)
committerisaacs <i@izs.me>
Thu, 8 Aug 2013 20:01:09 +0000 (13:01 -0700)
lib/_http_client.js
lib/_http_common.js
lib/_http_server.js
lib/_tls_legacy.js
lib/net.js
test/simple/test-http-upgrade-server.js

index f6a8cbf..20e123c 100644 (file)
@@ -253,6 +253,8 @@ function socketOnData(d) {
   var req = this._httpMessage;
   var parser = this.parser;
 
+  assert(parser);
+
   var ret = parser.execute(d);
   if (ret instanceof Error) {
     debug('parse error');
@@ -266,8 +268,8 @@ function socketOnData(d) {
     var res = parser.incoming;
     req.res = res;
 
-    socket.ondata = null;
-    socket.onend = null;
+    socket.removeListener('data', socketOnData);
+    socket.removeListener('end', socketOnEnd);
     parser.finish();
 
     var bodyHead = d.slice(bytesParsed, d.length);
@@ -281,6 +283,10 @@ function socketOnData(d) {
       socket.removeListener('close', socketCloseListener);
       socket.removeListener('error', socketErrorListener);
 
+      // TODO(isaacs): Need a way to reset a stream to fresh state
+      // IE, not flowing, and not explicitly paused.
+      socket._readableState.flowing = null;
+
       req.emit(eventName, res, socket, bodyHead);
       req.emit('close');
     } else {
@@ -293,6 +299,8 @@ function socketOnData(d) {
              // send a final response after this client sends a request
              // body. So, we must not free the parser.
              parser.incoming.statusCode !== 100) {
+    socket.removeListener('data', socketOnData);
+    socket.removeListener('end', socketOnEnd);
     freeParser(parser, req);
   }
 }
@@ -422,11 +430,11 @@ ClientRequest.prototype.onSocket = function(socket) {
       parser.maxHeaderPairs = 2000;
     }
 
+    parser.onIncoming = parserOnIncomingClient;
     socket.on('error', socketErrorListener);
-    socket.ondata = socketOnData;
-    socket.onend = socketOnEnd;
+    socket.on('data', socketOnData);
+    socket.on('end', socketOnEnd);
     socket.on('close', socketCloseListener);
-    parser.onIncoming = parserOnIncomingClient;
     req.emit('socket', socket);
   });
 
index 1ed8abc..4cf30ee 100644 (file)
@@ -55,6 +55,7 @@ function parserOnHeaders(headers, url) {
 // info.url is not set for response parsers but that's not
 // applicable here since all our parsers are request parsers.
 function parserOnHeadersComplete(info) {
+  debug('parserOnHeadersComplete', info);
   var parser = this;
   var headers = info.headers;
   var url = info.url;
@@ -200,11 +201,8 @@ function freeParser(parser, req) {
   if (parser) {
     parser._headers = [];
     parser.onIncoming = null;
-    if (parser.socket) {
-      parser.socket.onend = null;
-      parser.socket.ondata = null;
+    if (parser.socket)
       parser.socket.parser = null;
-    }
     parser.socket = null;
     parser.incoming = null;
     parsers.free(parser);
index b4fc2c3..844fd32 100644 (file)
@@ -339,11 +339,19 @@ function connectionListener(socket) {
     parser.maxHeaderPairs = 2000;
   }
 
-  socket.addListener('error', function(e) {
+  socket.addListener('error', socketOnError);
+  socket.addListener('close', serverSocketCloseListener);
+  parser.onIncoming = parserOnIncoming;
+  socket.on('end', socketOnEnd);
+  socket.on('data', socketOnData);
+
+  // TODO(isaacs): Move all these functions out of here
+  function socketOnError(e) {
     self.emit('clientError', e, this);
-  });
+  }
 
-  socket.ondata = function(d) {
+  function socketOnData(d) {
+    debug('SERVER socketOnData %d', d.length);
     var ret = parser.execute(d);
     if (ret instanceof Error) {
       debug('parse error');
@@ -352,26 +360,32 @@ function connectionListener(socket) {
       // Upgrade or CONNECT
       var bytesParsed = ret;
       var req = parser.incoming;
+      debug('SERVER upgrade or connect', req.method);
 
-      socket.ondata = null;
-      socket.onend = null;
+      socket.removeListener('data', socketOnData);
+      socket.removeListener('end', socketOnEnd);
       socket.removeListener('close', serverSocketCloseListener);
       parser.finish();
       freeParser(parser, req);
 
       var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
       if (EventEmitter.listenerCount(self, eventName) > 0) {
+        debug('SERVER have listener for %s', eventName);
         var bodyHead = d.slice(bytesParsed, d.length);
 
-        self.emit(eventName, req, req.socket, bodyHead);
+        // TODO(isaacs): Need a way to reset a stream to fresh state
+        // IE, not flowing, and not explicitly paused.
+        socket._readableState.flowing = null;
+        self.emit(eventName, req, socket, bodyHead);
       } else {
         // Got upgrade header or CONNECT method, but have no handler.
         socket.destroy();
       }
     }
-  };
+  }
 
-  socket.onend = function() {
+  function socketOnEnd() {
+    var socket = this;
     var ret = parser.finish();
 
     if (ret instanceof Error) {
@@ -390,14 +404,14 @@ function connectionListener(socket) {
     } else {
       if (socket.writable) socket.end();
     }
-  };
+  }
 
-  socket.addListener('close', serverSocketCloseListener);
 
   // The following callback is issued after the headers have been read on a
   // new message. In this callback we setup the response object and pass it
   // to the user.
-  parser.onIncoming = function(req, shouldKeepAlive) {
+
+  function parserOnIncoming(req, shouldKeepAlive) {
     incoming.push(req);
 
     var res = new ServerResponse(req);
@@ -415,7 +429,8 @@ function connectionListener(socket) {
 
     // When we're finished writing the response, check if this is the last
     // respose, if so destroy the socket.
-    res.on('finish', function() {
+    res.on('finish', resOnFinish);
+    function resOnFinish() {
       // Usually the first incoming element should be our request.  it may
       // be that in the case abortIncoming() was called that the incoming
       // array will be empty.
@@ -440,7 +455,7 @@ function connectionListener(socket) {
           m.assignSocket(socket);
         }
       }
-    });
+    }
 
     if (!util.isUndefined(req.headers.expect) &&
         (req.httpVersionMajor == 1 && req.httpVersionMinor == 1) &&
@@ -456,6 +471,6 @@ function connectionListener(socket) {
       self.emit('request', req, res);
     }
     return false; // Not a HEAD response. (Not even a response!)
-  };
+  }
 }
 exports._connectionListener = connectionListener;
index b5860a1..cb71920 100644 (file)
@@ -125,8 +125,6 @@ function onCryptoStreamEnd() {
   } else {
     debug('encrypted.onend');
   }
-
-  if (this.onend) this.onend();
 }
 
 
@@ -306,16 +304,6 @@ CryptoStream.prototype._read = function read(size) {
     }
   } else {
     // Give them requested data
-    if (this.ondata) {
-      var self = this;
-      this.ondata(pool, start, start + bytesRead);
-
-      // Consume data automatically
-      // simple/test-https-drain fails without it
-      process.nextTick(function() {
-        self.read(bytesRead);
-      });
-    }
     this.push(pool.slice(start, start + bytesRead));
   }
 
index e39a79b..6748f0d 100644 (file)
@@ -154,8 +154,6 @@ function Socket(options) {
     this.readable = this.writable = false;
   }
 
-  this.onend = null;
-
   // shut down the socket when we're finished with it.
   this.on('finish', onSocketFinish);
   this.on('_socketEnd', onSocketEnd);
@@ -507,9 +505,7 @@ function onread(nread, buffer) {
     self.bytesRead += nread;
 
     // Optimization: emit the original buffer with end points
-    var ret = true;
-    if (self.ondata) self.ondata(buffer);
-    else ret = self.push(buffer);
+    var ret = self.push(buffer);
 
     if (handle.reading && !ret) {
       handle.reading = false;
@@ -540,8 +536,6 @@ function onread(nread, buffer) {
     maybeDestroy(self);
   }
 
-  if (self.onend) self.once('end', self.onend);
-
   // push a null to signal the end of data.
   self.push(null);
 
index 84525a8..b514eff 100644 (file)
@@ -57,14 +57,14 @@ function testServer() {
 
     request_upgradeHead = upgradeHead;
 
-    socket.ondata = function(d, start, end) {
-      var data = d.toString('utf8', start, end);
+    socket.on('data', function(d) {
+      var data = d.toString('utf8');
       if (data == 'kill') {
         socket.end();
       } else {
         socket.write(data, 'utf8');
       }
-    };
+    });
   });
 }