From e576d4ec79ad983853d989819c78328aba0bb6f9 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 10:29:54 -0800 Subject: [PATCH] Add parser to agent --- lib/http.js | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 96 insertions(+), 4 deletions(-) diff --git a/lib/http.js b/lib/http.js index 247e0dc..25fc814 100644 --- a/lib/http.js +++ b/lib/http.js @@ -319,6 +319,13 @@ OutgoingMessage.prototype.assignSocket = function(socket) { }; +OutgoingMessage.prototype.detachSocket = function(socket) { + assert(socket._httpMessage == this); + socket._httpMessage = null; + this.socket = this.connection = null; +}; + + OutgoingMessage.prototype.destroy = function(error) { this.socket.destroy(error); }; @@ -343,7 +350,9 @@ OutgoingMessage.prototype._send = function(data, encoding) { OutgoingMessage.prototype._writeRaw = function(data, encoding) { - if (this.connection._httpMessage === this && this.connection.writable) { + if (this.connection && + this.connection._httpMessage === this && + this.connection.writable) { // There might be pending data in the this.output buffer. while (this.output.length) { if (!this.connection.writable) { @@ -602,8 +611,6 @@ OutgoingMessage.prototype.end = function(data, encoding) { OutgoingMessage.prototype._finish = function() { - this.socket._httpMessage = null; - this.socket = this.connection = null; this.emit('finish'); }; @@ -868,6 +875,8 @@ function connectionListener(socket) { // When we're finished writing the response, check if this is the last // respose, if so destroy the socket. res.on('finish', function() { + res.detachSocket(socket); + if (res._last) { socket.destroySoon(); } else { @@ -915,37 +924,117 @@ Agent.prototype.appendMessage = function(options) { var req = new ClientRequest(options); this.queue.push(req); + /* req.on('finish', function () { self._cycle(); }); + */ this._cycle(); + + return req; }; -Agent.prototype._establishNewConnection = function(socket, message) { +Agent.prototype._establishNewConnection = function() { var self = this; assert(this.sockets.length < this.maxSockets); // Grab a new "socket". Depending on the implementation of _getConnection // this could either be a raw TCP socket or a TLS stream. var socket = this._getConnection(this.host, this.port, function () { + debug("Agent _getConnection callback"); self._cycle(); }); this.sockets.push(socket); + // Add a parser to the socket. + var parser = parsers.alloc(); + parser.reinitialize('response'); + parser.socket = socket; + + socket.ondata = function(d, start, end) { + var ret = parser.execute(d, start, end - start); + if (ret instanceof Error) { + debug('parse error'); + socket.destroy(ret); + } else if (parser.incoming && parser.incoming.upgrade) { + var bytesParsed = ret; + socket.ondata = null; + socket.onend = null; + + var res = parser.incoming; + + // This is start + byteParsed + 1 due to the error of getting \n + // in the upgradeHead from the closing lines of the headers + var upgradeHead = d.slice(start + bytesParsed + 1, end); + + if (self.listeners('upgrade').length) { + self.emit('upgrade', res, res.socket, upgradeHead); + } else { + // Got upgrade header, but have no handler. + socket.destroy(); + } + } + }; + + socket.onend = function() { + parser.finish(); + socket.destroy(); + }; + // When the socket closes remove it from the list of available sockets. socket.on('close', function() { var i = self.sockets.indexOf(socket); if (i >= 0) self.sockets.splice(i, 1); + // unref the parser for easy gc + parsers.free(parser); }); + + parser.onIncoming = function(res, shouldKeepAlive) { + debug('AGENT incoming response!'); + + var req = socket._httpMessage; + assert(req); + + // Responses to HEAD requests are AWFUL. Ask Ryan. + // A major oversight in HTTP. Hence this nastiness. + var isHeadResponse = req.method == 'HEAD'; + debug('AGENT isHeadResponse ' + isHeadResponse); + + if (res.statusCode == 100) { + // restart the parser, as this is a continue message. + req.emit('continue'); + return true; + } + + if (req.shouldKeepAlive && res.headers.connection === 'close') { + req.shouldKeepAlive = false; + } + + res.addListener('end', function() { + debug('AGENT request complete disconnecting.'); + // For the moment we reconnect for every request. FIXME! + // All that should be required for keep-alive is to not reconnect, + // but outgoingFlush instead. + if (!req.shouldKeepAlive) socket.end(); + + req.detachSocket(socket); + self._cycle(); + }); + + req.emit('response', res); + + return isHeadResponse; + }; }; // Sub-classes can overwrite this method with e.g. something that supplies // TLS streams. Agent.prototype._getConnection = function(host, port, cb) { + debug("Agent connected!"); var c = net.createConnection(port, host); c.on('connect', cb); return c; @@ -956,6 +1045,8 @@ Agent.prototype._getConnection = function(host, port, cb) { // waiting sockets. If a waiting socket cannot be found, it will // start the process of establishing one. Agent.prototype._cycle = function() { + debug("Agent _cycle"); + var first = this.queue[0]; if (!first) return; @@ -965,6 +1056,7 @@ Agent.prototype._cycle = function() { // If the socket doesn't already have a message it's sending out // and the socket is available for writing... if (!socket._httpMessage && (socket.writable && socket.readable)) { + debug("Agent found socket, shift"); // We found an available connection! this.queue.shift(); // remove first from queue. first.assignSocket(socket); -- 2.7.4