From 967b5dbb453f811060645c9bd2bcc8ca8fef0d96 Mon Sep 17 00:00:00 2001 From: isaacs Date: Thu, 25 Jul 2013 19:33:15 -0700 Subject: [PATCH] http: Use streams3 directly, not .ondata/end --- lib/_http_client.js | 18 ++++++++++---- lib/_http_common.js | 6 ++--- lib/_http_server.js | 43 ++++++++++++++++++++++----------- lib/_tls_legacy.js | 12 --------- lib/net.js | 8 +----- test/simple/test-http-upgrade-server.js | 6 ++--- 6 files changed, 48 insertions(+), 45 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index f6a8cbf..20e123c 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -253,6 +253,8 @@ function socketOnData(d) { var req = this._httpMessage; var parser = this.parser; + assert(parser); + var ret = parser.execute(d); if (ret instanceof Error) { debug('parse error'); @@ -266,8 +268,8 @@ function socketOnData(d) { var res = parser.incoming; req.res = res; - socket.ondata = null; - socket.onend = null; + socket.removeListener('data', socketOnData); + socket.removeListener('end', socketOnEnd); parser.finish(); var bodyHead = d.slice(bytesParsed, d.length); @@ -281,6 +283,10 @@ function socketOnData(d) { socket.removeListener('close', socketCloseListener); socket.removeListener('error', socketErrorListener); + // TODO(isaacs): Need a way to reset a stream to fresh state + // IE, not flowing, and not explicitly paused. + socket._readableState.flowing = null; + req.emit(eventName, res, socket, bodyHead); req.emit('close'); } else { @@ -293,6 +299,8 @@ function socketOnData(d) { // send a final response after this client sends a request // body. So, we must not free the parser. parser.incoming.statusCode !== 100) { + socket.removeListener('data', socketOnData); + socket.removeListener('end', socketOnEnd); freeParser(parser, req); } } @@ -422,11 +430,11 @@ ClientRequest.prototype.onSocket = function(socket) { parser.maxHeaderPairs = 2000; } + parser.onIncoming = parserOnIncomingClient; socket.on('error', socketErrorListener); - socket.ondata = socketOnData; - socket.onend = socketOnEnd; + socket.on('data', socketOnData); + socket.on('end', socketOnEnd); socket.on('close', socketCloseListener); - parser.onIncoming = parserOnIncomingClient; req.emit('socket', socket); }); diff --git a/lib/_http_common.js b/lib/_http_common.js index 1ed8abc..4cf30ee 100644 --- a/lib/_http_common.js +++ b/lib/_http_common.js @@ -55,6 +55,7 @@ function parserOnHeaders(headers, url) { // info.url is not set for response parsers but that's not // applicable here since all our parsers are request parsers. function parserOnHeadersComplete(info) { + debug('parserOnHeadersComplete', info); var parser = this; var headers = info.headers; var url = info.url; @@ -200,11 +201,8 @@ function freeParser(parser, req) { if (parser) { parser._headers = []; parser.onIncoming = null; - if (parser.socket) { - parser.socket.onend = null; - parser.socket.ondata = null; + if (parser.socket) parser.socket.parser = null; - } parser.socket = null; parser.incoming = null; parsers.free(parser); diff --git a/lib/_http_server.js b/lib/_http_server.js index b4fc2c3..844fd32 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -339,11 +339,19 @@ function connectionListener(socket) { parser.maxHeaderPairs = 2000; } - socket.addListener('error', function(e) { + socket.addListener('error', socketOnError); + socket.addListener('close', serverSocketCloseListener); + parser.onIncoming = parserOnIncoming; + socket.on('end', socketOnEnd); + socket.on('data', socketOnData); + + // TODO(isaacs): Move all these functions out of here + function socketOnError(e) { self.emit('clientError', e, this); - }); + } - socket.ondata = function(d) { + function socketOnData(d) { + debug('SERVER socketOnData %d', d.length); var ret = parser.execute(d); if (ret instanceof Error) { debug('parse error'); @@ -352,26 +360,32 @@ function connectionListener(socket) { // Upgrade or CONNECT var bytesParsed = ret; var req = parser.incoming; + debug('SERVER upgrade or connect', req.method); - socket.ondata = null; - socket.onend = null; + socket.removeListener('data', socketOnData); + socket.removeListener('end', socketOnEnd); socket.removeListener('close', serverSocketCloseListener); parser.finish(); freeParser(parser, req); var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade'; if (EventEmitter.listenerCount(self, eventName) > 0) { + debug('SERVER have listener for %s', eventName); var bodyHead = d.slice(bytesParsed, d.length); - self.emit(eventName, req, req.socket, bodyHead); + // TODO(isaacs): Need a way to reset a stream to fresh state + // IE, not flowing, and not explicitly paused. + socket._readableState.flowing = null; + self.emit(eventName, req, socket, bodyHead); } else { // Got upgrade header or CONNECT method, but have no handler. socket.destroy(); } } - }; + } - socket.onend = function() { + function socketOnEnd() { + var socket = this; var ret = parser.finish(); if (ret instanceof Error) { @@ -390,14 +404,14 @@ function connectionListener(socket) { } else { if (socket.writable) socket.end(); } - }; + } - socket.addListener('close', serverSocketCloseListener); // The following callback is issued after the headers have been read on a // new message. In this callback we setup the response object and pass it // to the user. - parser.onIncoming = function(req, shouldKeepAlive) { + + function parserOnIncoming(req, shouldKeepAlive) { incoming.push(req); var res = new ServerResponse(req); @@ -415,7 +429,8 @@ function connectionListener(socket) { // When we're finished writing the response, check if this is the last // respose, if so destroy the socket. - res.on('finish', function() { + res.on('finish', resOnFinish); + function resOnFinish() { // Usually the first incoming element should be our request. it may // be that in the case abortIncoming() was called that the incoming // array will be empty. @@ -440,7 +455,7 @@ function connectionListener(socket) { m.assignSocket(socket); } } - }); + } if (!util.isUndefined(req.headers.expect) && (req.httpVersionMajor == 1 && req.httpVersionMinor == 1) && @@ -456,6 +471,6 @@ function connectionListener(socket) { self.emit('request', req, res); } return false; // Not a HEAD response. (Not even a response!) - }; + } } exports._connectionListener = connectionListener; diff --git a/lib/_tls_legacy.js b/lib/_tls_legacy.js index b5860a1..cb71920 100644 --- a/lib/_tls_legacy.js +++ b/lib/_tls_legacy.js @@ -125,8 +125,6 @@ function onCryptoStreamEnd() { } else { debug('encrypted.onend'); } - - if (this.onend) this.onend(); } @@ -306,16 +304,6 @@ CryptoStream.prototype._read = function read(size) { } } else { // Give them requested data - if (this.ondata) { - var self = this; - this.ondata(pool, start, start + bytesRead); - - // Consume data automatically - // simple/test-https-drain fails without it - process.nextTick(function() { - self.read(bytesRead); - }); - } this.push(pool.slice(start, start + bytesRead)); } diff --git a/lib/net.js b/lib/net.js index e39a79b..6748f0d 100644 --- a/lib/net.js +++ b/lib/net.js @@ -154,8 +154,6 @@ function Socket(options) { this.readable = this.writable = false; } - this.onend = null; - // shut down the socket when we're finished with it. this.on('finish', onSocketFinish); this.on('_socketEnd', onSocketEnd); @@ -507,9 +505,7 @@ function onread(nread, buffer) { self.bytesRead += nread; // Optimization: emit the original buffer with end points - var ret = true; - if (self.ondata) self.ondata(buffer); - else ret = self.push(buffer); + var ret = self.push(buffer); if (handle.reading && !ret) { handle.reading = false; @@ -540,8 +536,6 @@ function onread(nread, buffer) { maybeDestroy(self); } - if (self.onend) self.once('end', self.onend); - // push a null to signal the end of data. self.push(null); diff --git a/test/simple/test-http-upgrade-server.js b/test/simple/test-http-upgrade-server.js index 84525a8..b514eff 100644 --- a/test/simple/test-http-upgrade-server.js +++ b/test/simple/test-http-upgrade-server.js @@ -57,14 +57,14 @@ function testServer() { request_upgradeHead = upgradeHead; - socket.ondata = function(d, start, end) { - var data = d.toString('utf8', start, end); + socket.on('data', function(d) { + var data = d.toString('utf8'); if (data == 'kill') { socket.end(); } else { socket.write(data, 'utf8'); } - }; + }); }); } -- 2.7.4