// when piping, we only care about 'readable' events that happen
// after read()ing all the bytes and not getting any pushback.
this.ranOut = false;
+
+ // the number of writers that are awaiting a drain event in .pipe()s
+ this.awaitDrain = 0;
this.flowChunkSize = null;
this.decoder = null;
// This would be easier to follow with a .once() handler
// in flow(), but that is too slow.
this.on('readable', pipeOnReadable);
+ var ondrain = pipeOnDrain(src);
+ dest.on('drain', ondrain);
+ dest.on('unpipe', function(readable) {
+ if (readable === src)
+ dest.removeListener('drain', ondrain);
+
+ // if the reader is waiting for a drain event from this
+ // specific writer, then it would cause it to never start
+ // flowing again.
+ // So, if this is awaiting a drain, then we just call it now.
+ if (dest._writableState.needDrain)
+ ondrain();
+ });
state.flowing = true;
process.nextTick(function() {
return dest;
};
+function pipeOnDrain(src) {
+ return function() {
+ var dest = this;
+ var state = src._readableState;
+ state.awaitDrain --;
+ if (state.awaitDrain === 0)
+ flow(src);
+ };
+}
+
function flow(src) {
var state = src._readableState;
var chunk;
- var needDrain = 0;
-
- function ondrain() {
- needDrain--;
- if (needDrain === 0)
- flow(src);
- }
+ state.awaitDrain = 0;
function write(dest, i, list) {
var written = dest.write(chunk);
if (false === written) {
- needDrain++;
- dest.once('drain', ondrain);
+ state.awaitDrain++;
}
}
src.emit('data', chunk);
// if anyone needs a drain, then we have to wait for that.
- if (needDrain > 0)
+ if (state.awaitDrain > 0)
return;
}