this.needReadable = false;
this.emittedReadable = false;
+ // when piping, we only care about 'readable' events that happen
+ // after read()ing all the bytes and not getting any pushback.
+ this.ranOut = false;
+ this.flowChunkSize = null;
+
this.decoder = null;
if (options.encoding) {
if (!StringDecoder)
Readable.prototype.pipe = function(dest, pipeOpts) {
var src = this;
var state = this._readableState;
- if (!pipeOpts)
- pipeOpts = {};
switch (state.pipesCount) {
case 0:
});
}
+ if (pipeOpts && pipeOpts.chunkSize)
+ state.flowChunkSize = pipeOpts.chunkSize;
+
function onend() {
dest.end();
}
// start the flow.
if (!state.flowing) {
+ // the handler that waits for readable events after all
+ // the data gets sucked out in flow.
+ // This would be easier to follow with a .once() handler
+ // in flow(), but that is too slow.
+ this.on('readable', pipeOnReadable);
+
state.flowing = true;
process.nextTick(function() {
- flow(src, pipeOpts);
+ flow(src);
});
}
return dest;
};
-function flow(src, pipeOpts) {
+function flow(src) {
var state = src._readableState;
var chunk;
var needDrain = 0;
function ondrain() {
needDrain--;
if (needDrain === 0)
- flow(src, pipeOpts);
+ flow(src);
}
function write(dest, i, list) {
}
while (state.pipesCount &&
- null !== (chunk = src.read(pipeOpts.chunkSize))) {
+ null !== (chunk = src.read(state.pipeChunkSize))) {
if (state.pipesCount === 1)
write(state.pipes, 0, null);
// at this point, no one needed a drain, so we just ran out of data
// on the next readable event, start it over again.
- src.once('readable', function() {
- flow(src, pipeOpts);
- });
+ state.ranOut = true;
}
+function pipeOnReadable() {
+ if (this._readableState.ranOut) {
+ this._readableState.ranOut = false;
+ flow(this);
+ }
+}
+
+
Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
// got a match.
state.pipes = null;
state.pipesCount = 0;
+ this.removeListener('readable', pipeOnReadable);
if (dest)
dest.emit('unpipe', this);
return this;
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
+ this.removeListener('readable', pipeOnReadable);
for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this);
-
return this;
}