this.buffer = [];
this.length = 0;
- this.pipes = [];
+ this.pipes = null;
+ this.pipesCount = 0;
this.flowing = false;
this.ended = false;
this.endEmitted = false;
var state = this._readableState;
if (!pipeOpts)
pipeOpts = {};
- state.pipes.push(dest);
+
+ switch (state.pipesCount) {
+ case 0:
+ state.pipes = dest;
+ break;
+ case 1:
+ state.pipes = [ state.pipes, dest ];
+ break;
+ default:
+ state.pipes.push(dest);
+ break;
+ }
+ state.pipesCount += 1;
if ((!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
flow(src, pipeOpts);
}
- while (state.pipes.length &&
+ function write(dest, i, list) {
+ var written = dest.write(chunk);
+ if (false === written) {
+ needDrain++;
+ dest.once('drain', ondrain);
+ }
+ }
+
+ while (state.pipesCount &&
null !== (chunk = src.read(pipeOpts.chunkSize))) {
- state.pipes.forEach(function(dest, i, list) {
- var written = dest.write(chunk);
- if (false === written) {
- needDrain++;
- dest.once('drain', ondrain);
- }
- });
+
+ if (state.pipesCount === 1)
+ write(state.pipes, 0, null);
+ else
+ state.pipes.forEach(write);
+
src.emit('data', chunk);
// if anyone needs a drain, then we have to wait for that.
// function, or in the while loop, then stop flowing.
//
// NB: This is a pretty rare edge case.
- if (state.pipes.length === 0) {
+ if (state.pipesCount === 0) {
state.flowing = false;
// if there were data event listeners added, then switch to old mode.
Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
- if (!dest) {
- // remove all of them.
- state.pipes.forEach(function(dest, i, list) {
- dest.emit('unpipe', this);
- }, this);
- state.pipes.length = 0;
- } else {
- var i = state.pipes.indexOf(dest);
- if (i !== -1) {
+
+ // if we're not piping anywhere, then do nothing.
+ if (state.pipesCount === 0)
+ return this;
+
+ // just one destination. most common case.
+ if (state.pipesCount === 1) {
+ // passed in one, but it's not the right one.
+ if (dest && dest !== state.pipes)
+ return this;
+
+ if (!dest)
+ dest = state.pipes;
+
+ // got a match.
+ state.pipes = null;
+ state.pipesCount = 0;
+ if (dest)
dest.emit('unpipe', this);
- state.pipes.splice(i, 1);
- }
+ return this;
+ }
+
+ // slow case. multiple pipe destinations.
+
+ if (!dest) {
+ // remove all.
+ var dests = state.pipes;
+ var len = state.pipesCount;
+ state.pipes = null;
+ state.pipesCount = 0;
+
+ for (var i = 0; i < len; i++)
+ dests[i].emit('unpipe', this);
+
+ return this;
}
+
+ // try to find the right one.
+ var i = state.pipes.indexOf(dest);
+ if (i === -1)
+ return this;
+
+ state.pipes.splice(i, 1);
+ state.pipesCount -= 1;
+ if (state.pipesCount === 1)
+ state.pipes = state.pipes[0];
+
+ dest.emit('unpipe', this);
+
return this;
};
w[0].on('write', function() {
if (--writes === 0) {
r.unpipe();
+ t.equal(r._readableState.pipes, null);
w[0].end();
r.pipe(w[1]);
+ t.equal(r._readableState.pipes, w[1]);
}
});
var ended = 0;
+ var ended0 = false;
+ var ended1 = false;
w[0].on('end', function(results) {
+ t.equal(ended0, false);
+ ended0 = true;
ended++;
t.same(results, expect[0]);
});
w[1].on('end', function(results) {
+ t.equal(ended1, false);
+ ended1 = true;
ended++;
t.equal(ended, 2);
t.same(results, expect[1]);