first pass at http refactor for TLS
authorRyan Dahl <ry@tinyclouds.org>
Thu, 20 Jan 2011 10:41:16 +0000 (02:41 -0800)
committerRyan Dahl <ry@tinyclouds.org>
Fri, 21 Jan 2011 02:10:15 +0000 (18:10 -0800)
lib/http.js

index 960d28a..247e0dc 100644 (file)
@@ -1,8 +1,10 @@
 var util = require('util');
 var net = require('net');
 var stream = require('stream');
+var EventEmitter = require('events').EventEmitter;
 var FreeList = require('freelist').FreeList;
 var HTTPParser = process.binding('http_parser').HTTPParser;
+var assert = process.assert;
 
 
 var debug;
@@ -284,13 +286,9 @@ IncomingMessage.prototype._addHeaderLine = function(field, value) {
 };
 
 
-function OutgoingMessage(socket) {
+function OutgoingMessage() {
   stream.Stream.call(this);
 
-  // TODO Remove one of these eventually.
-  this.socket = socket;
-  this.connection = socket;
-
   this.output = [];
   this.outputEncodings = [];
 
@@ -312,6 +310,15 @@ util.inherits(OutgoingMessage, stream.Stream);
 exports.OutgoingMessage = OutgoingMessage;
 
 
+OutgoingMessage.prototype.assignSocket = function(socket) {
+  assert(!socket._httpMessage);
+  socket._httpMessage = this;
+  this.socket = socket;
+  this.connection = socket;
+  this._flush();
+};
+
+
 OutgoingMessage.prototype.destroy = function(error) {
   this.socket.destroy(error);
 };
@@ -336,7 +343,7 @@ OutgoingMessage.prototype._send = function(data, encoding) {
 
 
 OutgoingMessage.prototype._writeRaw = function(data, encoding) {
-  if (this.connection._outgoing[0] === this && this.connection.writable) {
+  if (this.connection._httpMessage === this && this.connection.writable) {
     // There might be pending data in the this.output buffer.
     while (this.output.length) {
       if (!this.connection.writable) {
@@ -550,7 +557,7 @@ OutgoingMessage.prototype.end = function(data, encoding) {
             data.length > 0 &&
             this.output.length === 0 &&
             this.connection.writable &&
-            this.connection._outgoing[0] === this;
+            this.connection._httpMessage === this;
 
   if (hot) {
     // Hot path. They're doing
@@ -585,17 +592,69 @@ OutgoingMessage.prototype.end = function(data, encoding) {
 
   // There is the first message on the outgoing queue, and we've sent
   // everything to the socket.
-  if (this.output.length === 0 && this.connection._outgoing[0] === this) {
-    debug('outgoing message end. shifting because was flushed');
-    this.connection._onOutgoingSent();
+  if (this.output.length === 0 && this.connection._httpMessage === this) {
+    debug('outgoing message end.');
+    this._finish();
   }
 
   return ret;
 };
 
 
+OutgoingMessage.prototype._finish = function() {
+  this.socket._httpMessage = null;
+  this.socket = this.connection = null;
+  this.emit('finish');
+};
+
+
+OutgoingMessage.prototype._flush = function() {
+  // This logic is probably a bit confusing. Let me explain a bit:
+  //
+  // In both HTTP servers and clients it is possible to queue up several
+  // outgoing messages. This is easiest to imagine in the case of a client.
+  // Take the following situation:
+  //
+  //    req1 = client.request('GET', '/');
+  //    req2 = client.request('POST', '/');
+  //
+  // When the user does
+  //
+  //   req2.write('hello world\n');
+  //
+  // it's possible that the first request has not been completely flushed to
+  // the socket yet. Thus the outgoing messages need to be prepared to queue
+  // up data internally before sending it on further to the socket's queue.
+  //
+  // This function, outgoingFlush(), is called by both the Server and Client
+  // to attempt to flush any pending messages out to the socket.
+
+  if (!this.socket) return;
+
+  var ret;
+
+  while (this.output.length) {
+    if (!this.socket.writable) return; // XXX Necessary?
+
+    var data = this.output.shift();
+    var encoding = this.outputEncodings.shift();
+
+    ret = this.socket.write(data, encoding);
+  }
+
+  if (this.finished) {
+    // This is a queue to the server or client to bring in the next this.
+    this._finish();
+  } else if (ret) {
+    this.emit('drain');
+  }
+};
+
+
+
+
 function ServerResponse(req) {
-  OutgoingMessage.call(this, req.socket);
+  OutgoingMessage.call(this);
 
   if (req.method === 'HEAD') this._hasBody = false;
 
@@ -666,19 +725,30 @@ ServerResponse.prototype.writeHeader = function() {
 };
 
 
-function ClientRequest(socket, method, url, headers) {
-  OutgoingMessage.call(this, socket);
+function ClientRequest(options) {
+  OutgoingMessage.call(this);
+
+  var method = this.method = (options.method || 'GET').toUpperCase();
+  var path = options.path || '/';
+  var headers = options.headers;
+
+  // Host header set by default.
+  if (options.host && !(headers.host || headers.Host || headers.HOST)) {
+    headers.Host = options.host;
+  }
 
-  this.method = method = method.toUpperCase();
   this.shouldKeepAlive = false;
   if (method === 'GET' || method === 'HEAD') {
     this.useChunkedEncodingByDefault = false;
   } else {
     this.useChunkedEncodingByDefault = true;
   }
+
+  // By default keep-alive is off. This is the last message unless otherwise
+  // specified.
   this._last = true;
 
-  this._storeHeader(method + ' ' + url + ' HTTP/1.1\r\n', headers);
+  this._storeHeader(method + ' ' + path + ' HTTP/1.1\r\n', headers);
 }
 util.inherits(ClientRequest, OutgoingMessage);
 
@@ -686,58 +756,12 @@ util.inherits(ClientRequest, OutgoingMessage);
 exports.ClientRequest = ClientRequest;
 
 
-function outgoingFlush(socket) {
-  // This logic is probably a bit confusing. Let me explain a bit:
-  //
-  // In both HTTP servers and clients it is possible to queue up several
-  // outgoing messages. This is easiest to imagine in the case of a client.
-  // Take the following situation:
-  //
-  //    req1 = client.request('GET', '/');
-  //    req2 = client.request('POST', '/');
-  //
-  // When the user does
-  //
-  //   req2.write('hello world\n');
-  //
-  // it's possible that the first request has not been completely flushed to
-  // the socket yet. Thus the outgoing messages need to be prepared to queue
-  // up data internally before sending it on further to the socket's queue.
-  //
-  // This function, outgoingFlush(), is called by both the Server and Client
-  // to attempt to flush any pending messages out to the socket.
-  var message = socket._outgoing[0];
-
-  if (!message) return;
-
-  var ret;
-
-  while (message.output.length) {
-    if (!socket.writable) return; // XXX Necessary?
-
-    var data = message.output.shift();
-    var encoding = message.outputEncodings.shift();
-
-    ret = socket.write(data, encoding);
-  }
-
-  if (message.finished) {
-    socket._onOutgoingSent();
-  } else if (ret) {
-    message.emit('drain');
-  }
-}
-
-
 function httpSocketSetup(socket) {
-  // An array of outgoing messages for the socket. In pipelined connections
-  // we need to keep track of the order they were sent.
-  socket._outgoing = [];
-
   // NOTE: be sure not to use ondrain elsewhere in this file!
   socket.ondrain = function() {
-    var message = socket._outgoing[0];
-    if (message) message.emit('drain');
+    if (socket._httpMessage) {
+      socket._httpMessage.emit('drain');
+    }
   };
 }
 
@@ -765,6 +789,7 @@ exports.createServer = function(requestListener) {
 
 function connectionListener(socket) {
   var self = this;
+  var outgoing = [];
 
   debug('SERVER new http connection');
 
@@ -811,9 +836,10 @@ function connectionListener(socket) {
   socket.onend = function() {
     parser.finish();
 
-    if (socket._outgoing.length) {
-      socket._outgoing[socket._outgoing.length - 1]._last = true;
-      outgoingFlush(socket);
+    if (outgoing.length) {
+      outgoing[outgoing.length - 1]._last = true;
+    } else if (socket._httpMessage) {
+      socket._httpMessage._last = true;
     } else {
       socket.end();
     }
@@ -824,21 +850,6 @@ function connectionListener(socket) {
     parsers.free(parser);
   });
 
-  // At the end of each response message, after it has been flushed to the
-  // socket.  Here we insert logic about what to do next.
-  socket._onOutgoingSent = function(message) {
-    var message = socket._outgoing.shift();
-    if (message._last) {
-      // No more messages to be pushed out.
-
-      socket.destroySoon();
-
-    } else if (socket._outgoing.length) {
-      // Push out the next message.
-      outgoingFlush(socket);
-    }
-  };
-
   // The following callback is issued after the headers have been read on a
   // new message. In this callback we setup the response object and pass it
   // to the user.
@@ -846,7 +857,27 @@ function connectionListener(socket) {
     var res = new ServerResponse(req);
     debug('server response shouldKeepAlive: ' + shouldKeepAlive);
     res.shouldKeepAlive = shouldKeepAlive;
-    socket._outgoing.push(res);
+
+    if (socket._httpMessage) {
+      // There are already pending outgoing res, append.
+      outgoing.push(res);
+    } else {
+      res.assignSocket(socket);
+    }
+
+    // When we're finished writing the response, check if this is the last
+    // respose, if so destroy the socket.
+    res.on('finish', function() {
+      if (res._last) {
+        socket.destroySoon();
+      } else {
+        // start sending the next message
+        var m = outgoing.shift();
+        if (m) {
+          m.assignSocket(socket);
+        }
+      }
+    });
 
     if ('expect' in req.headers &&
         (req.httpVersionMajor == 1 && req.httpVersionMinor == 1) &&
@@ -867,280 +898,123 @@ function connectionListener(socket) {
 exports._connectionListener = connectionListener;
 
 
-function Client() {
-  if (!(this instanceof Client)) return new Client();
-  net.Stream.call(this, { allowHalfOpen: true });
-  var self = this;
-
-  // Possible states:
-  // - disconnected
-  // - connecting
-  // - connected
-  this._state = 'disconnected';
-
-  httpSocketSetup(self);
-
-  function onData(d, start, end) {
-    if (!self.parser) {
-      throw new Error('parser not initialized prior to Client.ondata call');
-    }
-    var ret = self.parser.execute(d, start, end - start);
-    if (ret instanceof Error) {
-      self.destroy(ret);
-    } else if (self.parser.incoming && self.parser.incoming.upgrade) {
-      var bytesParsed = ret;
-      self.ondata = null;
-      self.onend = null;
-
-      var req = self.parser.incoming;
+function Agent(host, port) {
+  this.host = host;
+  this.port = port;
 
-      var upgradeHead = d.slice(start + bytesParsed + 1, end);
-
-      if (self.listeners('upgrade').length) {
-        self.emit('upgrade', req, self, upgradeHead);
-      } else {
-        self.destroy();
-      }
-    }
-  };
+  this.queue = [];
+  this.sockets = [];
+  this.maxSockets = 5;
+}
+util.inherits(Agent, EventEmitter);
 
-  self.addListener('connect', function() {
-    debug('CLIENT connected');
 
-    self.ondata = onData;
-    self.onend = onEnd;
+Agent.prototype.appendMessage = function(options) {
+  var self = this;
 
-    self._state = 'connected';
+  var req = new ClientRequest(options);
+  this.queue.push(req);
 
-    self._initParser();
-    outgoingFlush(self);
+  req.on('finish', function () {
+    self._cycle();
   });
 
-  function onEnd() {
-    if (self.parser) self.parser.finish();
-    debug('CLIENT got end closing. state = ' + self._state);
-    self.end();
-  };
-
-  self.addListener('close', function(e) {
-    self._state = 'disconnected';
-    if (e) return;
+  this._cycle();
+};
 
-    debug('CLIENT onClose. state = ' + self._state);
 
-    // finally done with the request
-    self._outgoing.shift();
+Agent.prototype._establishNewConnection = function(socket, message) {
+  var self = this;
+  assert(this.sockets.length < this.maxSockets);
 
-    // If there are more requests to handle, reconnect.
-    if (self._outgoing.length) {
-      self._ensureConnection();
-    } else if (self.parser) {
-      parsers.free(self.parser);
-      self.parser = null;
-    }
+  // Grab a new "socket". Depending on the implementation of _getConnection
+  // this could either be a raw TCP socket or a TLS stream.
+  var socket = this._getConnection(this.host, this.port, function () {
+    self._cycle();
   });
-}
-util.inherits(Client, net.Stream);
 
+  this.sockets.push(socket);
 
-exports.Client = Client;
+  // When the socket closes remove it from the list of available sockets.
+  socket.on('close', function() {
+    var i = self.sockets.indexOf(socket);
+    if (i >= 0) self.sockets.splice(i, 1);
+  });
+};
 
 
-exports.createClient = function(port, host, https, credentials) {
-  var c = new Client();
-  c.port = port;
-  c.host = host;
-  c.https = https;
-  c.credentials = credentials;
+// Sub-classes can overwrite this method with e.g. something that supplies
+// TLS streams.
+Agent.prototype._getConnection = function(host, port, cb) {
+  var c =  net.createConnection(port, host);
+  c.on('connect', cb);
   return c;
 };
 
 
-Client.prototype._initParser = function() {
-  var self = this;
-  if (!self.parser) self.parser = parsers.alloc();
-  self.parser.reinitialize('response');
-  self.parser.socket = self;
-  self.parser.onIncoming = function(res) {
-    debug('CLIENT incoming response!');
-
-    var req = self._outgoing[0];
-
-    // Responses to HEAD requests are AWFUL. Ask Ryan.
-    // A major oversight in HTTP. Hence this nastiness.
-    var isHeadResponse = req.method == 'HEAD';
-    debug('CLIENT isHeadResponse ' + isHeadResponse);
-
-    if (res.statusCode == 100) {
-      // restart the parser, as this is a continue message.
-      req.emit('continue');
-      return true;
-    }
-
-    if (req.shouldKeepAlive && res.headers.connection === 'close') {
-      req.shouldKeepAlive = false;
+// This method attempts to shuffle items along the queue into one of the
+// waiting sockets. If a waiting socket cannot be found, it will
+// start the process of establishing one.
+Agent.prototype._cycle = function() {
+  var first = this.queue[0];
+  if (!first) return;
+
+  // First try to find an available socket.
+  for (var i = 0; i < this.sockets.length; i++) {
+    var socket = this.sockets[i];
+    // If the socket doesn't already have a message it's sending out
+    // and the socket is available for writing...
+    if (!socket._httpMessage && (socket.writable && socket.readable)) {
+      // We found an available connection!
+      this.queue.shift(); // remove first from queue.
+      first.assignSocket(socket);
+      return;
     }
-
-    res.addListener('end', function() {
-      debug('CLIENT request complete disconnecting. state = ' + self._state);
-      // For the moment we reconnect for every request. FIXME!
-      // All that should be required for keep-alive is to not reconnect,
-      // but outgoingFlush instead.
-      if (req.shouldKeepAlive) {
-        outgoingFlush(self);
-        self._outgoing.shift();
-        outgoingFlush(self);
-      } else {
-        self.end();
-      }
-    });
-
-    req.emit('response', res);
-
-    return isHeadResponse;
-  };
-};
-
-
-// This is called each time a request has been pushed completely to the
-// socket. The message that was sent is still sitting at client._outgoing[0]
-// it is our responsibility to shift it off.
-//
-// We have to be careful when it we shift it because once we do any writes
-// to other requests will be flushed directly to the socket.
-//
-// At the moment we're implement a client which connects and disconnects on
-// each request/response cycle so we cannot shift off the request from
-// client._outgoing until we're completely disconnected after the response
-// comes back.
-Client.prototype._onOutgoingSent = function(message) {
-  // We've just finished a message. We don't end/shutdown the connection here
-  // because HTTP servers typically cannot handle half-closed connections
-  // (Node servers can).
-  //
-  // Instead, we just check if the connection is closed, and if so
-  // reconnect if we have pending messages.
-  if (this._outgoing.length) {
-    debug('CLIENT request flush. ensure connection.  state = ' + this._state);
-    this._ensureConnection();
   }
-};
 
-
-Client.prototype._ensureConnection = function() {
-  if (this._state == 'disconnected') {
-    debug('CLIENT reconnecting state = ' + this._state);
-    this.connect(this.port, this.host);
-    this._state = 'connecting';
+  // Otherwise see if we should be starting a new connection to handle
+  // this.
+  if (this.sockets.length < this.maxSockets) {
+    this._establishNewConnection();
   }
-};
 
-
-Client.prototype.request = function(method, url, headers) {
-  if (typeof(url) != 'string') {
-    // assume method was omitted, shift arguments
-    headers = url;
-    url = method;
-    method = 'GET';
-  }
-  var req = new ClientRequest(this, method, url, headers);
-  this._outgoing.push(req);
-  this._ensureConnection();
-  return req;
+  // All sockets are filled and all sockets are busy.
 };
 
 
-exports.cat = function(url, encoding_, headers_) {
-  var encoding = 'utf8',
-      headers = {},
-      callback = null;
+// process-wide hash of agents.
+// keys: "host:port" string
+// values: instance of Agent
+// That is, one agent remote host.
+// TODO currently we never remove agents from this hash. This is a small
+// memory leak. Have a 2 second timeout after a agent's sockets are to try
+// to remove it?
+var agents = {}
 
-  // parse the arguments for the various options... very ugly
-  if (typeof(arguments[1]) == 'string') {
-    encoding = arguments[1];
-    if (typeof(arguments[2]) == 'object') {
-      headers = arguments[2];
-      if (typeof(arguments[3]) == 'function') callback = arguments[3];
-    } else {
-      if (typeof(arguments[2]) == 'function') callback = arguments[2];
-    }
-  } else {
-    // didn't specify encoding
-    if (typeof(arguments[1]) == 'object') {
-      headers = arguments[1];
-      callback = arguments[2];
-    } else {
-      callback = arguments[1];
-    }
-  }
 
-  var url = require('url').parse(url);
+function getAgent(host, port) {
+  var id = host + ':' + port;
+  var agent = agents[id];
 
-  var hasHost = false;
-  if (Array.isArray(headers)) {
-    for (var i = 0, l = headers.length; i < l; i++) {
-      if (headers[i][0].toLowerCase() === 'host') {
-        hasHost = true;
-        break;
-      }
-    }
-  } else if (typeof headers === 'Object') {
-    var keys = Object.keys(headers);
-    for (var i = 0, l = keys.length; i < l; i++) {
-      var key = keys[i];
-      if (key.toLowerCase() == 'host') {
-        hasHost = true;
-        break;
-      }
-    }
+  if (!agent) {
+    agent = agents[id] = new Agent(host, port);
   }
-  if (!hasHost) headers['Host'] = url.hostname;
-
-  var content = '';
 
-  var client = exports.createClient(url.port || 80, url.hostname);
-  var req = client.request((url.pathname || '/') +
-                           (url.search || '') +
-                           (url.hash || ''),
-                           headers);
+  return agent;
+}
 
-  if (url.protocol == 'https:') {
-    client.https = true;
-  }
 
-  var callbackSent = false;
+exports.request = function(options, cb) {
+  var agent = getAgent(options.host, options.port);
+  var req = agent.appendMessage(options);
 
-  req.addListener('response', function(res) {
-    if (res.statusCode < 200 || res.statusCode >= 300) {
-      if (callback && !callbackSent) {
-        callback(res.statusCode);
-        callbackSent = true;
-      }
-      client.end();
-      return;
-    }
-    res.setEncoding(encoding);
-    res.addListener('data', function(chunk) { content += chunk; });
-    res.addListener('end', function() {
-      if (callback && !callbackSent) {
-        callback(null, content);
-        callbackSent = true;
-      }
+  if (cb) {
+    req.once('response', function (res) {
+      cb(null, res);
     });
-  });
-
-  client.addListener('error', function(err) {
-    if (callback && !callbackSent) {
-      callback(err);
-      callbackSent = true;
-    }
-  });
+  }
 
-  client.addListener('close', function() {
-    if (callback && !callbackSent) {
-      callback(new Error('Connection closed unexpectedly'));
-      callbackSent = true;
-    }
-  });
-  req.end();
+  return req;
 };
+
+