};
-// Shims to old interface.
-function Client(port, host) {
+// Legacy Interface:
+
+
+
+function Client() {
+ if (!(this instanceof Client)) return new Client();
+ net.Stream.call(this, { allowHalfOpen: true });
var self = this;
- this.port = port;
- this.host = host;
- this.agent = getAgent(this.host, this.port);
+ // Possible states:
+ // - disconnected
+ // - connecting
+ // - connected
+ this._state = 'disconnected';
- // proxy connect events upwards;
- this.agent.on('connect', function() {
- self.emit('connect');
- });
+ httpSocketSetup(self);
+ this._outgoing = [];
+
+ function onData(d, start, end) {
+ if (!self.parser) {
+ throw new Error('parser not initialized prior to Client.ondata call');
+ }
+ var ret = self.parser.execute(d, start, end - start);
+ if (ret instanceof Error) {
+ self.destroy(ret);
+ } else if (self.parser.incoming && self.parser.incoming.upgrade) {
+ var bytesParsed = ret;
+ self.ondata = null;
+ self.onend = null;
+ self._state = 'upgraded';
+
+ var res = self.parser.incoming;
+
+ if (self._httpMessage) {
+ self._httpMessage.detachSocket(self);
+ }
+
+ var upgradeHead = d.slice(start + bytesParsed + 1, end);
- this.agent.on('end', function() {
- self.emit('end');
+ if (self.listeners('upgrade').length) {
+ self.emit('upgrade', res, self, upgradeHead);
+ } else {
+ self.destroy();
+ }
+ }
+ };
+
+ self.addListener('connect', function() {
+ debug('CLIENT connected');
+
+ self.ondata = onData;
+ self.onend = onEnd;
+
+ self._state = 'connected';
+
+ self._initParser();
+
+ self._cycle();
});
- // proxy upgrade events upwards;
- this.agent.on('upgrade', function(res, socket, upgradeHead) {
- if (self.listeners('upgrade').length) {
- self.emit('upgrade', res, socket, upgradeHead);
- } else {
- socket.destroy();
+ function onEnd() {
+ if (self.parser) self.parser.finish();
+ debug('CLIENT got end closing. state = ' + self._state);
+ self.end();
+ };
+
+ self.addListener('close', function(e) {
+ self._state = 'disconnected';
+ self._upgraded = false;
+
+ // Free the parser.
+ if (self.parser) {
+ parsers.free(self.parser);
+ self.parser = null;
}
+
+ if (e) return;
+
+ // If we have an http message, then drop it
+ var req = self._httpMessage;
+ if (req && !req.res) {
+ req.detachSocket(self);
+ self.emit('error', new Error('socket hang up'));
+ }
+
+ debug('CLIENT onClose. state = ' + self._state);
+ self._cycle();
});
}
-util.inherits(Client, EventEmitter);
+util.inherits(Client, net.Stream);
+
+
+exports.Client = Client;
+
+
+exports.createClient = function(port, host) {
+ var c = new Client();
+ c.port = port;
+ c.host = host;
+ return c;
+};
+
+
+Client.prototype._initParser = function() {
+ var self = this;
+ if (!self.parser) self.parser = parsers.alloc();
+ self.parser.reinitialize('response');
+ self.parser.socket = self;
+ self.parser.onIncoming = function(res) {
+ debug('CLIENT incoming response!');
+
+ assert(self._httpMessage);
+ var req = self._httpMessage;
+
+ req.res = res;
+
+ // Responses to HEAD requests are AWFUL. Ask Ryan.
+ // A major oversight in HTTP. Hence this nastiness.
+ var isHeadResponse = req.method == 'HEAD';
+ debug('CLIENT isHeadResponse ' + isHeadResponse);
+
+ if (res.statusCode == 100) {
+ // restart the parser, as this is a continue message.
+ req.emit('continue');
+ return true;
+ }
+
+ if (req.shouldKeepAlive && res.headers.connection === 'close') {
+ req.shouldKeepAlive = false;
+ }
+
+ res.addListener('end', function() {
+ debug('CLIENT response complete disconnecting. state = ' + self._state);
+
+ if (!req.shouldKeepAlive) {
+ self.end();
+ }
+
+ req.detachSocket(self);
+ assert(!self._httpMessage);
+ self._cycle();
+ });
+
+ req.emit('response', res);
+
+ return isHeadResponse;
+ };
+};
+
+Client.prototype._cycle = function() {
+ debug("Client _cycle");
+ if (this._upgraded) return;
+
+ switch (this._state) {
+ case 'connecting':
+ break;
-// This method is used in a few tests to force the connections closed.
-// Again - just a shim so as not to break code. Not really important.
-Client.prototype.end = function() {
- for (var i = 0; i < this.agent.sockets.length; i++) {
- var socket = this.agent.sockets[i];
- if (!socket._httpMessage && socket.writable) socket.end();
+ case 'connected':
+ if (this.writable && this.readable) {
+ debug("Client _cycle shift()");
+ if (this._httpMessage) {
+ this._httpMessage._flush();
+ } else {
+ var req = this._outgoing.shift();
+ if (req) req.assignSocket(this);
+ }
+ }
+ break;
+
+ case 'disconnected':
+ if (this._httpMessage || this._outgoing.length) {
+ this._ensureConnection();
+ }
+ break;
}
};
-Client.prototype.destroy = function(e) {
- for (var i = 0; i < this.agent.sockets.length; i++) {
- var socket = this.agent.sockets[i];
- socket.destroy(e);
+Client.prototype._ensureConnection = function() {
+ if (this._state == 'disconnected') {
+ debug('CLIENT reconnecting state = ' + this._state);
+ this.connect(this.port, this.host);
+ this._state = 'connecting';
}
};
-Client.prototype.request = function(method, path, headers) {
- if (typeof(path) != 'string') {
+Client.prototype.request = function(method, url, headers) {
+ if (typeof(url) != 'string') {
// assume method was omitted, shift arguments
- headers = path;
- path = method;
+ headers = url;
+ url = method;
method = 'GET';
}
+ var self = this;
+
var options = {
- method: method,
- path: path,
- headers: headers,
- port: this.port,
- host: this.host
+ method: method || 'GET',
+ path: url || '/',
+ headers: headers
};
- var self = this;
- var req = exports.request(options);
-
- // proxy error events from req to Client
- req.on('error', function(err) {
- self.emit('error', err);
- });
+ var req = new ClientRequest(options);
+ this._outgoing.push(req);
+ this._cycle();
return req;
};
-exports.createClient = function(port, host) {
- return new Client(port, host);
-};
-
-
exports.cat = function(url, encoding_, headers_) {
var encoding = 'utf8',
headers = {},
var url = require('url').parse(url);
- var options = {
- method: 'GET',
- port: url.port || 80,
- host: url.hostname,
- headers: headers,
- path: (url.pathname || '/') + (url.search || '') + (url.hash || '')
- };
+ var hasHost = false;
+ if (Array.isArray(headers)) {
+ for (var i = 0, l = headers.length; i < l; i++) {
+ if (headers[i][0].toLowerCase() === 'host') {
+ hasHost = true;
+ break;
+ }
+ }
+ } else if (typeof headers === 'Object') {
+ var keys = Object.keys(headers);
+ for (var i = 0, l = keys.length; i < l; i++) {
+ var key = keys[i];
+ if (key.toLowerCase() == 'host') {
+ hasHost = true;
+ break;
+ }
+ }
+ }
+ if (!hasHost) headers['Host'] = url.hostname;
var content = '';
+
+ var client = exports.createClient(url.port || 80, url.hostname);
+ var req = client.request((url.pathname || '/') +
+ (url.search || '') +
+ (url.hash || ''),
+ headers);
+
+ if (url.protocol == 'https:') {
+ client.https = true;
+ }
+
var callbackSent = false;
- var req = exports.request(options, function(res) {
+ req.addListener('response', function(res) {
if (res.statusCode < 200 || res.statusCode >= 300) {
if (callback && !callbackSent) {
callback(res.statusCode);
callbackSent = true;
}
+ client.end();
return;
}
-
res.setEncoding(encoding);
res.addListener('data', function(chunk) { content += chunk; });
res.addListener('end', function() {
}
});
});
- req.end();
-
- req.on('error', function(err) {
+ client.addListener('error', function(err) {
if (callback && !callbackSent) {
callback(err);
callbackSent = true;
}
});
+
+ client.addListener('close', function() {
+ if (callback && !callbackSent) {
+ callback(new Error('Connection closed unexpectedly'));
+ callbackSent = true;
+ }
+ });
+ req.end();
};