streams2: Refactor out .once() usage from Readable.pipe()
authorisaacs <i@izs.me>
Wed, 28 Nov 2012 09:25:39 +0000 (01:25 -0800)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 01:00:33 +0000 (17:00 -0800)
lib/_stream_readable.js

index abb23e4..9c24299 100644 (file)
@@ -81,6 +81,9 @@ function ReadableState(options, stream) {
   // when piping, we only care about 'readable' events that happen
   // after read()ing all the bytes and not getting any pushback.
   this.ranOut = false;
+
+  // the number of writers that are awaiting a drain event in .pipe()s
+  this.awaitDrain = 0;
   this.flowChunkSize = null;
 
   this.decoder = null;
@@ -330,6 +333,19 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
     // This would be easier to follow with a .once() handler
     // in flow(), but that is too slow.
     this.on('readable', pipeOnReadable);
+    var ondrain = pipeOnDrain(src);
+    dest.on('drain', ondrain);
+    dest.on('unpipe', function(readable) {
+      if (readable === src)
+        dest.removeListener('drain', ondrain);
+
+      // if the reader is waiting for a drain event from this
+      // specific writer, then it would cause it to never start
+      // flowing again.
+      // So, if this is awaiting a drain, then we just call it now.
+      if (dest._writableState.needDrain)
+        ondrain();
+    });
 
     state.flowing = true;
     process.nextTick(function() {
@@ -340,22 +356,25 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
   return dest;
 };
 
+function pipeOnDrain(src) {
+  return function() {
+    var dest = this;
+    var state = src._readableState;
+    state.awaitDrain --;
+    if (state.awaitDrain === 0)
+      flow(src);
+  };
+}
+
 function flow(src) {
   var state = src._readableState;
   var chunk;
-  var needDrain = 0;
-
-  function ondrain() {
-    needDrain--;
-    if (needDrain === 0)
-      flow(src);
-  }
+  state.awaitDrain = 0;
 
   function write(dest, i, list) {
     var written = dest.write(chunk);
     if (false === written) {
-      needDrain++;
-      dest.once('drain', ondrain);
+      state.awaitDrain++;
     }
   }
 
@@ -370,7 +389,7 @@ function flow(src) {
     src.emit('data', chunk);
 
     // if anyone needs a drain, then we have to wait for that.
-    if (needDrain > 0)
+    if (state.awaitDrain > 0)
       return;
   }