streams2: Do multipipe without always using forEach
authorisaacs <i@izs.me>
Sat, 17 Nov 2012 03:27:41 +0000 (14:27 +1100)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 01:00:32 +0000 (17:00 -0800)
The Array.forEach call is too expensive.

lib/_stream_readable.js
test/simple/test-stream2-basic.js

index c598c91..e045ead 100644 (file)
@@ -62,7 +62,8 @@ function ReadableState(options, stream) {
 
   this.buffer = [];
   this.length = 0;
-  this.pipes = [];
+  this.pipes = null;
+  this.pipesCount = 0;
   this.flowing = false;
   this.ended = false;
   this.endEmitted = false;
@@ -282,7 +283,19 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
   var state = this._readableState;
   if (!pipeOpts)
     pipeOpts = {};
-  state.pipes.push(dest);
+
+  switch (state.pipesCount) {
+    case 0:
+      state.pipes = dest;
+      break;
+    case 1:
+      state.pipes = [ state.pipes, dest ];
+      break;
+    default:
+      state.pipes.push(dest);
+      break;
+  }
+  state.pipesCount += 1;
 
   if ((!pipeOpts || pipeOpts.end !== false) &&
       dest !== process.stdout &&
@@ -320,15 +333,22 @@ function flow(src, pipeOpts) {
       flow(src, pipeOpts);
   }
 
-  while (state.pipes.length &&
+  function write(dest, i, list) {
+    var written = dest.write(chunk);
+    if (false === written) {
+      needDrain++;
+      dest.once('drain', ondrain);
+    }
+  }
+
+  while (state.pipesCount &&
          null !== (chunk = src.read(pipeOpts.chunkSize))) {
-    state.pipes.forEach(function(dest, i, list) {
-      var written = dest.write(chunk);
-      if (false === written) {
-        needDrain++;
-        dest.once('drain', ondrain);
-      }
-    });
+
+    if (state.pipesCount === 1)
+      write(state.pipes, 0, null);
+    else
+      state.pipes.forEach(write);
+
     src.emit('data', chunk);
 
     // if anyone needs a drain, then we have to wait for that.
@@ -340,7 +360,7 @@ function flow(src, pipeOpts) {
   // function, or in the while loop, then stop flowing.
   //
   // NB: This is a pretty rare edge case.
-  if (state.pipes.length === 0) {
+  if (state.pipesCount === 0) {
     state.flowing = false;
 
     // if there were data event listeners added, then switch to old mode.
@@ -356,19 +376,55 @@ function flow(src, pipeOpts) {
 
 Readable.prototype.unpipe = function(dest) {
   var state = this._readableState;
-  if (!dest) {
-    // remove all of them.
-    state.pipes.forEach(function(dest, i, list) {
-      dest.emit('unpipe', this);
-    }, this);
-    state.pipes.length = 0;
-  } else {
-    var i = state.pipes.indexOf(dest);
-    if (i !== -1) {
+
+  // if we're not piping anywhere, then do nothing.
+  if (state.pipesCount === 0)
+    return this;
+
+  // just one destination.  most common case.
+  if (state.pipesCount === 1) {
+    // passed in one, but it's not the right one.
+    if (dest && dest !== state.pipes)
+      return this;
+
+    if (!dest)
+      dest = state.pipes;
+
+    // got a match.
+    state.pipes = null;
+    state.pipesCount = 0;
+    if (dest)
       dest.emit('unpipe', this);
-      state.pipes.splice(i, 1);
-    }
+    return this;
+  }
+
+  // slow case. multiple pipe destinations.
+
+  if (!dest) {
+    // remove all.
+    var dests = state.pipes;
+    var len = state.pipesCount;
+    state.pipes = null;
+    state.pipesCount = 0;
+
+    for (var i = 0; i < len; i++)
+      dests[i].emit('unpipe', this);
+
+    return this;
   }
+
+  // try to find the right one.
+  var i = state.pipes.indexOf(dest);
+  if (i === -1)
+    return this;
+
+  state.pipes.splice(i, 1);
+  state.pipesCount -= 1;
+  if (state.pipesCount === 1)
+    state.pipes = state.pipes[0];
+
+  dest.emit('unpipe', this);
+
   return this;
 };
 
index c28cdc9..0b4f4cf 100644 (file)
@@ -209,19 +209,27 @@ test('pipe', function(t) {
     w[0].on('write', function() {
       if (--writes === 0) {
         r.unpipe();
+        t.equal(r._readableState.pipes, null);
         w[0].end();
         r.pipe(w[1]);
+        t.equal(r._readableState.pipes, w[1]);
       }
     });
 
     var ended = 0;
 
+    var ended0 = false;
+    var ended1 = false;
     w[0].on('end', function(results) {
+      t.equal(ended0, false);
+      ended0 = true;
       ended++;
       t.same(results, expect[0]);
     });
 
     w[1].on('end', function(results) {
+      t.equal(ended1, false);
+      ended1 = true;
       ended++;
       t.equal(ended, 2);
       t.same(results, expect[1]);