var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
+ var cleanedUp = false;
function cleanup() {
debug('cleanup');
// cleanup event handlers once the pipe is broken
src.removeListener('end', cleanup);
src.removeListener('data', ondata);
+ cleanedUp = true;
+
// if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
debug('ondata');
var ret = dest.write(chunk);
if (false === ret) {
- debug('false write response, pause',
- src._readableState.awaitDrain);
- src._readableState.awaitDrain++;
+ // If the user unpiped during `dest.write()`, it is possible
+ // to get stuck in a permanently paused state if that write
+ // also returned false.
+ if (state.pipesCount === 1 &&
+ state.pipes[0] === dest &&
+ src.listenerCount('data') === 1 &&
+ !cleanedUp) {
+ debug('false write response, pause', src._readableState.awaitDrain);
+ src._readableState.awaitDrain++;
+ }
src.pause();
}
}
--- /dev/null
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const stream = require('stream');
+
+const reader = new stream.Readable();
+const writer1 = new stream.Writable();
+const writer2 = new stream.Writable();
+
+// 560000 is chosen here because it is larger than the (default) highWaterMark
+// and will cause `.write()` to return false
+// See: https://github.com/nodejs/node/issues/2323
+const buffer = new Buffer(560000);
+
+reader._read = function(n) {};
+
+writer1._write = common.mustCall(function(chunk, encoding, cb) {
+ this.emit('chunk-received');
+ cb();
+}, 1);
+writer1.once('chunk-received', function() {
+ reader.unpipe(writer1);
+ reader.pipe(writer2);
+ reader.push(buffer);
+ setImmediate(function() {
+ reader.push(buffer);
+ setImmediate(function() {
+ reader.push(buffer);
+ });
+ });
+});
+
+writer2._write = common.mustCall(function(chunk, encoding, cb) {
+ cb();
+}, 3);
+
+reader.pipe(writer1);
+reader.push(buffer);