var util = require('util');
var net = require('net');
var stream = require('stream');
+var EventEmitter = require('events').EventEmitter;
var FreeList = require('freelist').FreeList;
var HTTPParser = process.binding('http_parser').HTTPParser;
+var assert = process.assert;
var debug;
};
-function OutgoingMessage(socket) {
+function OutgoingMessage() {
stream.Stream.call(this);
- // TODO Remove one of these eventually.
- this.socket = socket;
- this.connection = socket;
-
this.output = [];
this.outputEncodings = [];
exports.OutgoingMessage = OutgoingMessage;
+OutgoingMessage.prototype.assignSocket = function(socket) {
+ assert(!socket._httpMessage);
+ socket._httpMessage = this;
+ this.socket = socket;
+ this.connection = socket;
+ this._flush();
+};
+
+
OutgoingMessage.prototype.destroy = function(error) {
this.socket.destroy(error);
};
OutgoingMessage.prototype._writeRaw = function(data, encoding) {
- if (this.connection._outgoing[0] === this && this.connection.writable) {
+ if (this.connection._httpMessage === this && this.connection.writable) {
// There might be pending data in the this.output buffer.
while (this.output.length) {
if (!this.connection.writable) {
data.length > 0 &&
this.output.length === 0 &&
this.connection.writable &&
- this.connection._outgoing[0] === this;
+ this.connection._httpMessage === this;
if (hot) {
// Hot path. They're doing
// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
- if (this.output.length === 0 && this.connection._outgoing[0] === this) {
- debug('outgoing message end. shifting because was flushed');
- this.connection._onOutgoingSent();
+ if (this.output.length === 0 && this.connection._httpMessage === this) {
+ debug('outgoing message end.');
+ this._finish();
}
return ret;
};
+OutgoingMessage.prototype._finish = function() {
+ this.socket._httpMessage = null;
+ this.socket = this.connection = null;
+ this.emit('finish');
+};
+
+
+OutgoingMessage.prototype._flush = function() {
+ // This logic is probably a bit confusing. Let me explain a bit:
+ //
+ // In both HTTP servers and clients it is possible to queue up several
+ // outgoing messages. This is easiest to imagine in the case of a client.
+ // Take the following situation:
+ //
+ // req1 = client.request('GET', '/');
+ // req2 = client.request('POST', '/');
+ //
+ // When the user does
+ //
+ // req2.write('hello world\n');
+ //
+ // it's possible that the first request has not been completely flushed to
+ // the socket yet. Thus the outgoing messages need to be prepared to queue
+ // up data internally before sending it on further to the socket's queue.
+ //
+ // This function, outgoingFlush(), is called by both the Server and Client
+ // to attempt to flush any pending messages out to the socket.
+
+ if (!this.socket) return;
+
+ var ret;
+
+ while (this.output.length) {
+ if (!this.socket.writable) return; // XXX Necessary?
+
+ var data = this.output.shift();
+ var encoding = this.outputEncodings.shift();
+
+ ret = this.socket.write(data, encoding);
+ }
+
+ if (this.finished) {
+ // This is a queue to the server or client to bring in the next this.
+ this._finish();
+ } else if (ret) {
+ this.emit('drain');
+ }
+};
+
+
+
+
function ServerResponse(req) {
- OutgoingMessage.call(this, req.socket);
+ OutgoingMessage.call(this);
if (req.method === 'HEAD') this._hasBody = false;
};
-function ClientRequest(socket, method, url, headers) {
- OutgoingMessage.call(this, socket);
+function ClientRequest(options) {
+ OutgoingMessage.call(this);
+
+ var method = this.method = (options.method || 'GET').toUpperCase();
+ var path = options.path || '/';
+ var headers = options.headers;
+
+ // Host header set by default.
+ if (options.host && !(headers.host || headers.Host || headers.HOST)) {
+ headers.Host = options.host;
+ }
- this.method = method = method.toUpperCase();
this.shouldKeepAlive = false;
if (method === 'GET' || method === 'HEAD') {
this.useChunkedEncodingByDefault = false;
} else {
this.useChunkedEncodingByDefault = true;
}
+
+ // By default keep-alive is off. This is the last message unless otherwise
+ // specified.
this._last = true;
- this._storeHeader(method + ' ' + url + ' HTTP/1.1\r\n', headers);
+ this._storeHeader(method + ' ' + path + ' HTTP/1.1\r\n', headers);
}
util.inherits(ClientRequest, OutgoingMessage);
exports.ClientRequest = ClientRequest;
-function outgoingFlush(socket) {
- // This logic is probably a bit confusing. Let me explain a bit:
- //
- // In both HTTP servers and clients it is possible to queue up several
- // outgoing messages. This is easiest to imagine in the case of a client.
- // Take the following situation:
- //
- // req1 = client.request('GET', '/');
- // req2 = client.request('POST', '/');
- //
- // When the user does
- //
- // req2.write('hello world\n');
- //
- // it's possible that the first request has not been completely flushed to
- // the socket yet. Thus the outgoing messages need to be prepared to queue
- // up data internally before sending it on further to the socket's queue.
- //
- // This function, outgoingFlush(), is called by both the Server and Client
- // to attempt to flush any pending messages out to the socket.
- var message = socket._outgoing[0];
-
- if (!message) return;
-
- var ret;
-
- while (message.output.length) {
- if (!socket.writable) return; // XXX Necessary?
-
- var data = message.output.shift();
- var encoding = message.outputEncodings.shift();
-
- ret = socket.write(data, encoding);
- }
-
- if (message.finished) {
- socket._onOutgoingSent();
- } else if (ret) {
- message.emit('drain');
- }
-}
-
-
function httpSocketSetup(socket) {
- // An array of outgoing messages for the socket. In pipelined connections
- // we need to keep track of the order they were sent.
- socket._outgoing = [];
-
// NOTE: be sure not to use ondrain elsewhere in this file!
socket.ondrain = function() {
- var message = socket._outgoing[0];
- if (message) message.emit('drain');
+ if (socket._httpMessage) {
+ socket._httpMessage.emit('drain');
+ }
};
}
function connectionListener(socket) {
var self = this;
+ var outgoing = [];
debug('SERVER new http connection');
socket.onend = function() {
parser.finish();
- if (socket._outgoing.length) {
- socket._outgoing[socket._outgoing.length - 1]._last = true;
- outgoingFlush(socket);
+ if (outgoing.length) {
+ outgoing[outgoing.length - 1]._last = true;
+ } else if (socket._httpMessage) {
+ socket._httpMessage._last = true;
} else {
socket.end();
}
parsers.free(parser);
});
- // At the end of each response message, after it has been flushed to the
- // socket. Here we insert logic about what to do next.
- socket._onOutgoingSent = function(message) {
- var message = socket._outgoing.shift();
- if (message._last) {
- // No more messages to be pushed out.
-
- socket.destroySoon();
-
- } else if (socket._outgoing.length) {
- // Push out the next message.
- outgoingFlush(socket);
- }
- };
-
// The following callback is issued after the headers have been read on a
// new message. In this callback we setup the response object and pass it
// to the user.
var res = new ServerResponse(req);
debug('server response shouldKeepAlive: ' + shouldKeepAlive);
res.shouldKeepAlive = shouldKeepAlive;
- socket._outgoing.push(res);
+
+ if (socket._httpMessage) {
+ // There are already pending outgoing res, append.
+ outgoing.push(res);
+ } else {
+ res.assignSocket(socket);
+ }
+
+ // When we're finished writing the response, check if this is the last
+ // respose, if so destroy the socket.
+ res.on('finish', function() {
+ if (res._last) {
+ socket.destroySoon();
+ } else {
+ // start sending the next message
+ var m = outgoing.shift();
+ if (m) {
+ m.assignSocket(socket);
+ }
+ }
+ });
if ('expect' in req.headers &&
(req.httpVersionMajor == 1 && req.httpVersionMinor == 1) &&
exports._connectionListener = connectionListener;
-function Client() {
- if (!(this instanceof Client)) return new Client();
- net.Stream.call(this, { allowHalfOpen: true });
- var self = this;
-
- // Possible states:
- // - disconnected
- // - connecting
- // - connected
- this._state = 'disconnected';
-
- httpSocketSetup(self);
-
- function onData(d, start, end) {
- if (!self.parser) {
- throw new Error('parser not initialized prior to Client.ondata call');
- }
- var ret = self.parser.execute(d, start, end - start);
- if (ret instanceof Error) {
- self.destroy(ret);
- } else if (self.parser.incoming && self.parser.incoming.upgrade) {
- var bytesParsed = ret;
- self.ondata = null;
- self.onend = null;
-
- var req = self.parser.incoming;
+function Agent(host, port) {
+ this.host = host;
+ this.port = port;
- var upgradeHead = d.slice(start + bytesParsed + 1, end);
-
- if (self.listeners('upgrade').length) {
- self.emit('upgrade', req, self, upgradeHead);
- } else {
- self.destroy();
- }
- }
- };
+ this.queue = [];
+ this.sockets = [];
+ this.maxSockets = 5;
+}
+util.inherits(Agent, EventEmitter);
- self.addListener('connect', function() {
- debug('CLIENT connected');
- self.ondata = onData;
- self.onend = onEnd;
+Agent.prototype.appendMessage = function(options) {
+ var self = this;
- self._state = 'connected';
+ var req = new ClientRequest(options);
+ this.queue.push(req);
- self._initParser();
- outgoingFlush(self);
+ req.on('finish', function () {
+ self._cycle();
});
- function onEnd() {
- if (self.parser) self.parser.finish();
- debug('CLIENT got end closing. state = ' + self._state);
- self.end();
- };
-
- self.addListener('close', function(e) {
- self._state = 'disconnected';
- if (e) return;
+ this._cycle();
+};
- debug('CLIENT onClose. state = ' + self._state);
- // finally done with the request
- self._outgoing.shift();
+Agent.prototype._establishNewConnection = function(socket, message) {
+ var self = this;
+ assert(this.sockets.length < this.maxSockets);
- // If there are more requests to handle, reconnect.
- if (self._outgoing.length) {
- self._ensureConnection();
- } else if (self.parser) {
- parsers.free(self.parser);
- self.parser = null;
- }
+ // Grab a new "socket". Depending on the implementation of _getConnection
+ // this could either be a raw TCP socket or a TLS stream.
+ var socket = this._getConnection(this.host, this.port, function () {
+ self._cycle();
});
-}
-util.inherits(Client, net.Stream);
+ this.sockets.push(socket);
-exports.Client = Client;
+ // When the socket closes remove it from the list of available sockets.
+ socket.on('close', function() {
+ var i = self.sockets.indexOf(socket);
+ if (i >= 0) self.sockets.splice(i, 1);
+ });
+};
-exports.createClient = function(port, host, https, credentials) {
- var c = new Client();
- c.port = port;
- c.host = host;
- c.https = https;
- c.credentials = credentials;
+// Sub-classes can overwrite this method with e.g. something that supplies
+// TLS streams.
+Agent.prototype._getConnection = function(host, port, cb) {
+ var c = net.createConnection(port, host);
+ c.on('connect', cb);
return c;
};
-Client.prototype._initParser = function() {
- var self = this;
- if (!self.parser) self.parser = parsers.alloc();
- self.parser.reinitialize('response');
- self.parser.socket = self;
- self.parser.onIncoming = function(res) {
- debug('CLIENT incoming response!');
-
- var req = self._outgoing[0];
-
- // Responses to HEAD requests are AWFUL. Ask Ryan.
- // A major oversight in HTTP. Hence this nastiness.
- var isHeadResponse = req.method == 'HEAD';
- debug('CLIENT isHeadResponse ' + isHeadResponse);
-
- if (res.statusCode == 100) {
- // restart the parser, as this is a continue message.
- req.emit('continue');
- return true;
- }
-
- if (req.shouldKeepAlive && res.headers.connection === 'close') {
- req.shouldKeepAlive = false;
+// This method attempts to shuffle items along the queue into one of the
+// waiting sockets. If a waiting socket cannot be found, it will
+// start the process of establishing one.
+Agent.prototype._cycle = function() {
+ var first = this.queue[0];
+ if (!first) return;
+
+ // First try to find an available socket.
+ for (var i = 0; i < this.sockets.length; i++) {
+ var socket = this.sockets[i];
+ // If the socket doesn't already have a message it's sending out
+ // and the socket is available for writing...
+ if (!socket._httpMessage && (socket.writable && socket.readable)) {
+ // We found an available connection!
+ this.queue.shift(); // remove first from queue.
+ first.assignSocket(socket);
+ return;
}
-
- res.addListener('end', function() {
- debug('CLIENT request complete disconnecting. state = ' + self._state);
- // For the moment we reconnect for every request. FIXME!
- // All that should be required for keep-alive is to not reconnect,
- // but outgoingFlush instead.
- if (req.shouldKeepAlive) {
- outgoingFlush(self);
- self._outgoing.shift();
- outgoingFlush(self);
- } else {
- self.end();
- }
- });
-
- req.emit('response', res);
-
- return isHeadResponse;
- };
-};
-
-
-// This is called each time a request has been pushed completely to the
-// socket. The message that was sent is still sitting at client._outgoing[0]
-// it is our responsibility to shift it off.
-//
-// We have to be careful when it we shift it because once we do any writes
-// to other requests will be flushed directly to the socket.
-//
-// At the moment we're implement a client which connects and disconnects on
-// each request/response cycle so we cannot shift off the request from
-// client._outgoing until we're completely disconnected after the response
-// comes back.
-Client.prototype._onOutgoingSent = function(message) {
- // We've just finished a message. We don't end/shutdown the connection here
- // because HTTP servers typically cannot handle half-closed connections
- // (Node servers can).
- //
- // Instead, we just check if the connection is closed, and if so
- // reconnect if we have pending messages.
- if (this._outgoing.length) {
- debug('CLIENT request flush. ensure connection. state = ' + this._state);
- this._ensureConnection();
}
-};
-
-Client.prototype._ensureConnection = function() {
- if (this._state == 'disconnected') {
- debug('CLIENT reconnecting state = ' + this._state);
- this.connect(this.port, this.host);
- this._state = 'connecting';
+ // Otherwise see if we should be starting a new connection to handle
+ // this.
+ if (this.sockets.length < this.maxSockets) {
+ this._establishNewConnection();
}
-};
-
-Client.prototype.request = function(method, url, headers) {
- if (typeof(url) != 'string') {
- // assume method was omitted, shift arguments
- headers = url;
- url = method;
- method = 'GET';
- }
- var req = new ClientRequest(this, method, url, headers);
- this._outgoing.push(req);
- this._ensureConnection();
- return req;
+ // All sockets are filled and all sockets are busy.
};
-exports.cat = function(url, encoding_, headers_) {
- var encoding = 'utf8',
- headers = {},
- callback = null;
+// process-wide hash of agents.
+// keys: "host:port" string
+// values: instance of Agent
+// That is, one agent remote host.
+// TODO currently we never remove agents from this hash. This is a small
+// memory leak. Have a 2 second timeout after a agent's sockets are to try
+// to remove it?
+var agents = {}
- // parse the arguments for the various options... very ugly
- if (typeof(arguments[1]) == 'string') {
- encoding = arguments[1];
- if (typeof(arguments[2]) == 'object') {
- headers = arguments[2];
- if (typeof(arguments[3]) == 'function') callback = arguments[3];
- } else {
- if (typeof(arguments[2]) == 'function') callback = arguments[2];
- }
- } else {
- // didn't specify encoding
- if (typeof(arguments[1]) == 'object') {
- headers = arguments[1];
- callback = arguments[2];
- } else {
- callback = arguments[1];
- }
- }
- var url = require('url').parse(url);
+function getAgent(host, port) {
+ var id = host + ':' + port;
+ var agent = agents[id];
- var hasHost = false;
- if (Array.isArray(headers)) {
- for (var i = 0, l = headers.length; i < l; i++) {
- if (headers[i][0].toLowerCase() === 'host') {
- hasHost = true;
- break;
- }
- }
- } else if (typeof headers === 'Object') {
- var keys = Object.keys(headers);
- for (var i = 0, l = keys.length; i < l; i++) {
- var key = keys[i];
- if (key.toLowerCase() == 'host') {
- hasHost = true;
- break;
- }
- }
+ if (!agent) {
+ agent = agents[id] = new Agent(host, port);
}
- if (!hasHost) headers['Host'] = url.hostname;
-
- var content = '';
- var client = exports.createClient(url.port || 80, url.hostname);
- var req = client.request((url.pathname || '/') +
- (url.search || '') +
- (url.hash || ''),
- headers);
+ return agent;
+}
- if (url.protocol == 'https:') {
- client.https = true;
- }
- var callbackSent = false;
+exports.request = function(options, cb) {
+ var agent = getAgent(options.host, options.port);
+ var req = agent.appendMessage(options);
- req.addListener('response', function(res) {
- if (res.statusCode < 200 || res.statusCode >= 300) {
- if (callback && !callbackSent) {
- callback(res.statusCode);
- callbackSent = true;
- }
- client.end();
- return;
- }
- res.setEncoding(encoding);
- res.addListener('data', function(chunk) { content += chunk; });
- res.addListener('end', function() {
- if (callback && !callbackSent) {
- callback(null, content);
- callbackSent = true;
- }
+ if (cb) {
+ req.once('response', function (res) {
+ cb(null, res);
});
- });
-
- client.addListener('error', function(err) {
- if (callback && !callbackSent) {
- callback(err);
- callbackSent = true;
- }
- });
+ }
- client.addListener('close', function() {
- if (callback && !callbackSent) {
- callback(new Error('Connection closed unexpectedly'));
- callbackSent = true;
- }
- });
- req.end();
+ return req;
};
+
+