return skipBody;
}
+// XXX This is a mess.
+// TODO: http.Parser should be a Writable emits request/response events.
function parserOnBody(b, start, len) {
var parser = this;
- var slice = b.slice(start, start + len);
- if (parser.incoming._paused || parser.incoming._pendings.length) {
- parser.incoming._pendings.push(slice);
- } else {
- parser.incoming._emitData(slice);
+ var stream = parser.incoming;
+ var rs = stream._readableState;
+ var socket = stream.socket;
+
+ // pretend this was the result of a stream._read call.
+ if (len > 0) {
+ var slice = b.slice(start, start + len);
+ rs.onread(null, slice);
}
+
+ if (rs.length >= rs.highWaterMark)
+ socket.pause();
}
function parserOnMessageComplete() {
var parser = this;
- parser.incoming.complete = true;
+ var stream = parser.incoming;
+ var socket = stream.socket;
+
+ stream.complete = true;
// Emit any trailing headers.
var headers = parser._headers;
parser._url = '';
}
- if (!parser.incoming.upgrade) {
+ if (!stream.upgrade)
// For upgraded connections, also emit this after parser.execute
- if (parser.incoming._paused || parser.incoming._pendings.length) {
- parser.incoming._pendings.push(END_OF_FILE);
- } else {
- parser.incoming.readable = false;
- parser.incoming._emitEnd();
- }
- }
+ stream._readableState.onread(null, null);
if (parser.socket.readable) {
// force to read the next incoming message
- parser.socket.resume();
+ socket.resume();
}
}
/* Abstract base class for ServerRequest and ClientResponse. */
function IncomingMessage(socket) {
- Stream.call(this);
+ Stream.Readable.call(this);
+
+ // XXX This implementation is kind of all over the place
+ // When the parser emits body chunks, they go in this list.
+ // _read() pulls them out, and when it finds EOF, it ends.
+ this._pendings = [];
- // TODO Remove one of these eventually.
this.socket = socket;
this.connection = socket;
this.readable = true;
- this._paused = false;
this._pendings = [];
-
- this._endEmitted = false;
+ this._pendingIndex = 0;
// request (server) only
this.url = '';
-
this.method = null;
// response (client) only
this.statusCode = null;
this.client = this.socket;
+
+ // flag for backwards compatibility grossness.
+ this._consuming = false;
}
-util.inherits(IncomingMessage, Stream);
+util.inherits(IncomingMessage, Stream.Readable);
exports.IncomingMessage = IncomingMessage;
-IncomingMessage.prototype.destroy = function(error) {
- this.socket.destroy(error);
+IncomingMessage.prototype.read = function(n) {
+ this._consuming = true;
+ return Stream.Readable.prototype.read.call(this, n);
};
-IncomingMessage.prototype.setEncoding = function(encoding) {
- var StringDecoder = require('string_decoder').StringDecoder; // lazy load
- this._decoder = new StringDecoder(encoding);
-};
-
-
-IncomingMessage.prototype.pause = function() {
- this._paused = true;
- this.socket.pause();
+IncomingMessage.prototype._read = function(n, callback) {
+ // We actually do almost nothing here, because the parserOnBody
+ // function fills up our internal buffer directly. However, we
+ // do need to unpause the underlying socket so that it flows.
+ if (!this.socket.readable)
+ return callback(null, null);
+ else
+ this.socket.resume();
};
-IncomingMessage.prototype.resume = function() {
- this._paused = false;
- if (this.socket) {
- this.socket.resume();
- }
-
- this._emitPending();
+IncomingMessage.prototype.destroy = function(error) {
+ this.socket.destroy(error);
};
-IncomingMessage.prototype._emitPending = function(callback) {
- if (this._pendings.length) {
- var self = this;
- process.nextTick(function() {
- while (!self._paused && self._pendings.length) {
- var chunk = self._pendings.shift();
- if (chunk !== END_OF_FILE) {
- assert(Buffer.isBuffer(chunk));
- self._emitData(chunk);
- } else {
- assert(self._pendings.length === 0);
- self.readable = false;
- self._emitEnd();
- }
- }
- if (callback) {
- callback();
- }
- });
- } else if (callback) {
- callback();
- }
-};
IncomingMessage.prototype._emitData = function(d) {
// don't keep alive connections where the client expects 100 Continue
// but we sent a final status; they may put extra bytes on the wire.
- if (this._expect_continue && ! this._sent100) {
+ if (this._expect_continue && !this._sent100) {
this.shouldKeepAlive = false;
}
// Socket closed before we emitted 'end' below.
req.res.emit('aborted');
var res = req.res;
- req.res._emitPending(function() {
- res._emitEnd();
+ res.on('end', function() {
res.emit('close');
- res = null;
});
+ res._readableState.onread(null, null);
} else if (!req.res && !req._hadError) {
// This socket error fired before we started to
// receive a response. The error needs to
}
+// client
function parserOnIncomingClient(res, shouldKeepAlive) {
var parser = this;
var socket = this.socket;
var req = socket._httpMessage;
+
// propogate "domain" setting...
if (req.domain && !res.domain) {
debug('setting "res.domain"');
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
COUNTER_HTTP_CLIENT_RESPONSE();
- req.emit('response', res);
req.res = res;
res.req = req;
-
+ var handled = req.emit('response', res);
res.on('end', responseOnEnd);
+ // If the user did not listen for the 'response' event, then they
+ // can't possibly read the data, so we .resume() it into the void
+ // so that the socket doesn't hang there in a paused state.
+ if (!handled)
+ res.resume();
+
return isHeadResponse;
}
+// client
function responseOnEnd() {
var res = this;
var req = res.req;
incoming.push(req);
var res = new ServerResponse(req);
- debug('server response shouldKeepAlive: ' + shouldKeepAlive);
+
res.shouldKeepAlive = shouldKeepAlive;
DTRACE_HTTP_SERVER_REQUEST(req, socket);
COUNTER_HTTP_SERVER_REQUEST();
incoming.shift();
+ // if the user never called req.read(), and didn't pipe() or
+ // .resume() or .on('data'), then we call req.resume() so that the
+ // bytes will be pulled off the wire.
+ if (!req._consuming)
+ req.resume();
+
res.detachSocket(socket);
if (res._last) {