Add parser to agent
authorRyan Dahl <ry@tinyclouds.org>
Thu, 20 Jan 2011 18:29:54 +0000 (10:29 -0800)
committerRyan Dahl <ry@tinyclouds.org>
Fri, 21 Jan 2011 02:10:15 +0000 (18:10 -0800)
lib/http.js

index 247e0dc..25fc814 100644 (file)
@@ -319,6 +319,13 @@ OutgoingMessage.prototype.assignSocket = function(socket) {
 };
 
 
+OutgoingMessage.prototype.detachSocket = function(socket) {
+  assert(socket._httpMessage == this);
+  socket._httpMessage = null;
+  this.socket = this.connection = null;
+};
+
+
 OutgoingMessage.prototype.destroy = function(error) {
   this.socket.destroy(error);
 };
@@ -343,7 +350,9 @@ OutgoingMessage.prototype._send = function(data, encoding) {
 
 
 OutgoingMessage.prototype._writeRaw = function(data, encoding) {
-  if (this.connection._httpMessage === this && this.connection.writable) {
+  if (this.connection &&
+      this.connection._httpMessage === this &&
+      this.connection.writable) {
     // There might be pending data in the this.output buffer.
     while (this.output.length) {
       if (!this.connection.writable) {
@@ -602,8 +611,6 @@ OutgoingMessage.prototype.end = function(data, encoding) {
 
 
 OutgoingMessage.prototype._finish = function() {
-  this.socket._httpMessage = null;
-  this.socket = this.connection = null;
   this.emit('finish');
 };
 
@@ -868,6 +875,8 @@ function connectionListener(socket) {
     // When we're finished writing the response, check if this is the last
     // respose, if so destroy the socket.
     res.on('finish', function() {
+      res.detachSocket(socket);
+
       if (res._last) {
         socket.destroySoon();
       } else {
@@ -915,37 +924,117 @@ Agent.prototype.appendMessage = function(options) {
   var req = new ClientRequest(options);
   this.queue.push(req);
 
+  /*
   req.on('finish', function () {
     self._cycle();
   });
+  */
 
   this._cycle();
+
+  return req;
 };
 
 
-Agent.prototype._establishNewConnection = function(socket, message) {
+Agent.prototype._establishNewConnection = function() {
   var self = this;
   assert(this.sockets.length < this.maxSockets);
 
   // Grab a new "socket". Depending on the implementation of _getConnection
   // this could either be a raw TCP socket or a TLS stream.
   var socket = this._getConnection(this.host, this.port, function () {
+    debug("Agent _getConnection callback");
     self._cycle();
   });
 
   this.sockets.push(socket);
 
+  // Add a parser to the socket.
+  var parser = parsers.alloc();
+  parser.reinitialize('response');
+  parser.socket = socket;
+
+  socket.ondata = function(d, start, end) {
+    var ret = parser.execute(d, start, end - start);
+    if (ret instanceof Error) {
+      debug('parse error');
+      socket.destroy(ret);
+    } else if (parser.incoming && parser.incoming.upgrade) {
+      var bytesParsed = ret;
+      socket.ondata = null;
+      socket.onend = null;
+
+      var res = parser.incoming;
+
+      // This is start + byteParsed + 1 due to the error of getting \n
+      // in the upgradeHead from the closing lines of the headers
+      var upgradeHead = d.slice(start + bytesParsed + 1, end);
+
+      if (self.listeners('upgrade').length) {
+        self.emit('upgrade', res, res.socket, upgradeHead);
+      } else {
+        // Got upgrade header, but have no handler.
+        socket.destroy();
+      }
+    }
+  };
+
+  socket.onend = function() {
+    parser.finish();
+    socket.destroy();
+  };
+
   // When the socket closes remove it from the list of available sockets.
   socket.on('close', function() {
     var i = self.sockets.indexOf(socket);
     if (i >= 0) self.sockets.splice(i, 1);
+    // unref the parser for easy gc
+    parsers.free(parser);
   });
+
+  parser.onIncoming = function(res, shouldKeepAlive) {
+    debug('AGENT incoming response!');
+
+    var req = socket._httpMessage;
+    assert(req);
+
+    // Responses to HEAD requests are AWFUL. Ask Ryan.
+    // A major oversight in HTTP. Hence this nastiness.
+    var isHeadResponse = req.method == 'HEAD';
+    debug('AGENT isHeadResponse ' + isHeadResponse);
+
+    if (res.statusCode == 100) {
+      // restart the parser, as this is a continue message.
+      req.emit('continue');
+      return true;
+    }
+
+    if (req.shouldKeepAlive && res.headers.connection === 'close') {
+      req.shouldKeepAlive = false;
+    }
+
+    res.addListener('end', function() {
+      debug('AGENT request complete disconnecting.');
+      // For the moment we reconnect for every request. FIXME!
+      // All that should be required for keep-alive is to not reconnect,
+      // but outgoingFlush instead.
+      if (!req.shouldKeepAlive) socket.end();
+
+      req.detachSocket(socket);
+      self._cycle();
+    });
+
+    req.emit('response', res);
+
+    return isHeadResponse;
+  };
 };
 
 
 // Sub-classes can overwrite this method with e.g. something that supplies
 // TLS streams.
 Agent.prototype._getConnection = function(host, port, cb) {
+  debug("Agent connected!");
   var c =  net.createConnection(port, host);
   c.on('connect', cb);
   return c;
@@ -956,6 +1045,8 @@ Agent.prototype._getConnection = function(host, port, cb) {
 // waiting sockets. If a waiting socket cannot be found, it will
 // start the process of establishing one.
 Agent.prototype._cycle = function() {
+  debug("Agent _cycle");
+
   var first = this.queue[0];
   if (!first) return;
 
@@ -965,6 +1056,7 @@ Agent.prototype._cycle = function() {
     // If the socket doesn't already have a message it's sending out
     // and the socket is available for writing...
     if (!socket._httpMessage && (socket.writable && socket.readable)) {
+      debug("Agent found socket, shift");
       // We found an available connection!
       this.queue.shift(); // remove first from queue.
       first.assignSocket(socket);