streams2: Handle immediate synthetic transforms properly
authorisaacs <i@izs.me>
Sun, 7 Oct 2012 20:12:21 +0000 (13:12 -0700)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 01:00:26 +0000 (17:00 -0800)
lib/_stream_passthrough.js
lib/_stream_readable.js
lib/_stream_transform.js

index dd6390f..5acd27b 100644 (file)
@@ -34,6 +34,5 @@ function PassThrough(options) {
 }
 
 PassThrough.prototype._transform = function(chunk, output, cb) {
-  output(chunk);
-  cb();
+  cb(null, chunk);
 };
index ea944fc..916ebc2 100644 (file)
@@ -28,7 +28,7 @@ var StringDecoder;
 
 util.inherits(Readable, Stream);
 
-function ReadableState(options, stream) {
+function ReadableState(options) {
   options = options || {};
 
   this.bufferSize = options.bufferSize || 16 * 1024;
@@ -44,7 +44,6 @@ function ReadableState(options, stream) {
   this.flowing = false;
   this.ended = false;
   this.endEmitted = false;
-  this.stream = stream;
   this.reading = false;
 
   // whenever we return null, then we set a flag to say
@@ -71,52 +70,76 @@ Readable.prototype.setEncoding = function(enc) {
   this._readableState.decoder = new StringDecoder(enc);
 };
 
-// you can override either this method, or _read(n, cb) below.
-Readable.prototype.read = function(n) {
-  var state = this._readableState;
 
-  if (state.length === 0 && state.ended) {
-    endReadable(this);
-    return null;
-  }
+function howMuchToRead(n, state) {
+  if (state.length === 0 && state.ended)
+    return 0;
+
+  if (isNaN(n))
+    return state.length;
 
-  if (isNaN(n) || n <= 0)
-    n = state.length
+  if (n <= 0)
+    return 0;
 
-  // XXX: controversial.
   // don't have that much.  return null, unless we've ended.
-  // However, if the low water mark is lower than the number of bytes,
-  // then we still need to return what we have, or else it won't kick
-  // off another _read() call.  For example,
-  // lwm=5
-  // len=9
-  // read(10)
-  // We don't have that many bytes, so it'd be tempting to return null,
-  // but then it won't ever cause _read to be called, so in that case,
-  // we just return what we have, and let the programmer deal with it.
   if (n > state.length) {
-    if (!state.ended && state.length <= state.lowWaterMark) {
+    if (!state.ended) {
       state.needReadable = true;
-      n = 0;
+      return 0;
     } else
-      n = state.length;
+      return state.length;
   }
 
+  return n;
+}
 
-  var ret;
-  if (n > 0)
-    ret = fromList(n, state.buffer, state.length, !!state.decoder);
-  else
-    ret = null;
+// you can override either this method, or _read(n, cb) below.
+Readable.prototype.read = function(n) {
+  var state = this._readableState;
+  var nOrig = n;
 
-  if (ret === null || ret.length === 0)
-    state.needReadable = true;
+  n = howMuchToRead(n, state);
 
-  state.length -= n;
+  // if we've ended, and we're now clear, then finish it up.
+  if (n === 0 && state.ended) {
+    endReadable(this);
+    return null;
+  }
 
-  if (!state.ended &&
-      state.length <= state.lowWaterMark &&
-      !state.reading) {
+  // All the actual chunk generation logic needs to be
+  // *below* the call to _read.  The reason is that in certain
+  // synthetic stream cases, such as passthrough streams, _read
+  // may be a completely synchronous operation which may change
+  // the state of the read buffer, providing enough data when
+  // before there was *not* enough.
+  //
+  // So, the steps are:
+  // 1. Figure out what the state of things will be after we do
+  // a read from the buffer.
+  //
+  // 2. If that resulting state will trigger a _read, then call _read.
+  // Note that this may be asynchronous, or synchronous.  Yes, it is
+  // deeply ugly to write APIs this way, but that still doesn't mean
+  // that the Readable class should behave improperly, as streams are
+  // designed to be sync/async agnostic.
+  // Take note if the _read call is sync or async (ie, if the read call
+  // has returned yet), so that we know whether or not it's safe to emit
+  // 'readable' etc.
+  //
+  // 3. Actually pull the requested chunks out of the buffer and return.
+
+  // if we need a readable event, then we need to do some reading.
+  var doRead = state.needReadable;
+  // if we currently have less than the lowWaterMark, then also read some
+  if (state.length - n <= state.lowWaterMark)
+    doRead = true;
+  // however, if we've ended, then there's no point, and if we're already
+  // reading, then it's unnecessary.
+  if (state.ended || state.reading)
+    doRead = false;
+
+  if (doRead) {
+    var sync = true;
     state.reading = true;
     // call internal read method
     this._read(state.bufferSize, function onread(er, chunk) {
@@ -125,21 +148,27 @@ Readable.prototype.read = function(n) {
         return this.emit('error', er);
 
       if (!chunk || !chunk.length) {
+        // eof
         state.ended = true;
         // if we've ended and we have some data left, then emit
         // 'readable' now to make sure it gets picked up.
-        if (state.length > 0)
-          this.emit('readable');
-        else
-          endReadable(this);
+        if (!sync) {
+          if (state.length > 0)
+            this.emit('readable');
+          else
+            endReadable(this);
+        }
         return;
       }
 
       if (state.decoder)
         chunk = state.decoder.write(chunk);
 
-      state.length += chunk.length;
-      state.buffer.push(chunk);
+      // update the buffer info.
+      if (chunk) {
+        state.length += chunk.length;
+        state.buffer.push(chunk);
+      }
 
       // if we haven't gotten enough to pass the lowWaterMark,
       // and we haven't ended, then don't bother telling the user
@@ -152,14 +181,33 @@ Readable.prototype.read = function(n) {
         return;
       }
 
-      // now we have something to call this.read() to get.
-      if (state.needReadable) {
+      if (state.needReadable && !sync) {
         state.needReadable = false;
         this.emit('readable');
       }
     }.bind(this));
+    sync = false;
   }
 
+  // If _read called its callback synchronously, then `reading`
+  // will be false, and we need to re-evaluate how much data we
+  // can return to the user.
+  if (doRead && !state.reading)
+    n = howMuchToRead(nOrig, state);
+
+  var ret;
+  if (n > 0)
+    ret = fromList(n, state.buffer, state.length, !!state.decoder);
+  else
+    ret = null;
+
+  if (ret === null || ret.length === 0) {
+    state.needReadable = true;
+    n = 0;
+  }
+
+  state.length -= n;
+
   return ret;
 };
 
index 40917de..16f2cac 100644 (file)
@@ -122,13 +122,14 @@ Transform.prototype._write = function(chunk, cb) {
   if (ts.pendingReadCb) {
     var readcb = ts.pendingReadCb;
     ts.pendingReadCb = null;
-    this._read(-1, readcb);
+    this._read(0, readcb);
   }
 
   // if we weren't waiting for it, but nothing is queued up, then
   // still kick off a transform, just so it's there when the user asks.
-  if (rs.length === 0) {
-    var ret = this.read();
+  var doRead = rs.needReadable || rs.length <= rs.lowWaterMark;
+  if (doRead && !rs.reading) {
+    var ret = this.read(0);
     if (ret !== null)
       return cb(new Error('invalid stream transform state'));
   }