streams2: Get rid of .once() usage in Readable.pipe
authorisaacs <i@izs.me>
Wed, 28 Nov 2012 02:20:16 +0000 (18:20 -0800)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 01:00:33 +0000 (17:00 -0800)
Significant performance impact

lib/_stream_readable.js

index fff50d3..abb23e4 100644 (file)
@@ -78,6 +78,11 @@ function ReadableState(options, stream) {
   this.needReadable = false;
   this.emittedReadable = false;
 
+  // when piping, we only care about 'readable' events that happen
+  // after read()ing all the bytes and not getting any pushback.
+  this.ranOut = false;
+  this.flowChunkSize = null;
+
   this.decoder = null;
   if (options.encoding) {
     if (!StringDecoder)
@@ -285,8 +290,6 @@ Readable.prototype._read = function(n, cb) {
 Readable.prototype.pipe = function(dest, pipeOpts) {
   var src = this;
   var state = this._readableState;
-  if (!pipeOpts)
-    pipeOpts = {};
 
   switch (state.pipesCount) {
     case 0:
@@ -311,6 +314,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
     });
   }
 
+  if (pipeOpts && pipeOpts.chunkSize)
+    state.flowChunkSize = pipeOpts.chunkSize;
+
   function onend() {
     dest.end();
   }
@@ -319,16 +325,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
 
   // start the flow.
   if (!state.flowing) {
+    // the handler that waits for readable events after all
+    // the data gets sucked out in flow.
+    // This would be easier to follow with a .once() handler
+    // in flow(), but that is too slow.
+    this.on('readable', pipeOnReadable);
+
     state.flowing = true;
     process.nextTick(function() {
-      flow(src, pipeOpts);
+      flow(src);
     });
   }
 
   return dest;
 };
 
-function flow(src, pipeOpts) {
+function flow(src) {
   var state = src._readableState;
   var chunk;
   var needDrain = 0;
@@ -336,7 +348,7 @@ function flow(src, pipeOpts) {
   function ondrain() {
     needDrain--;
     if (needDrain === 0)
-      flow(src, pipeOpts);
+      flow(src);
   }
 
   function write(dest, i, list) {
@@ -348,7 +360,7 @@ function flow(src, pipeOpts) {
   }
 
   while (state.pipesCount &&
-         null !== (chunk = src.read(pipeOpts.chunkSize))) {
+         null !== (chunk = src.read(state.pipeChunkSize))) {
 
     if (state.pipesCount === 1)
       write(state.pipes, 0, null);
@@ -377,11 +389,17 @@ function flow(src, pipeOpts) {
 
   // at this point, no one needed a drain, so we just ran out of data
   // on the next readable event, start it over again.
-  src.once('readable', function() {
-    flow(src, pipeOpts);
-  });
+  state.ranOut = true;
 }
 
+function pipeOnReadable() {
+  if (this._readableState.ranOut) {
+    this._readableState.ranOut = false;
+    flow(this);
+  }
+}
+
+
 Readable.prototype.unpipe = function(dest) {
   var state = this._readableState;
 
@@ -401,6 +419,7 @@ Readable.prototype.unpipe = function(dest) {
     // got a match.
     state.pipes = null;
     state.pipesCount = 0;
+    this.removeListener('readable', pipeOnReadable);
     if (dest)
       dest.emit('unpipe', this);
     return this;
@@ -414,10 +433,10 @@ Readable.prototype.unpipe = function(dest) {
     var len = state.pipesCount;
     state.pipes = null;
     state.pipesCount = 0;
+    this.removeListener('readable', pipeOnReadable);
 
     for (var i = 0; i < len; i++)
       dests[i].emit('unpipe', this);
-
     return this;
   }