Merge remote-tracking branch 'ry/v0.10'
[platform/upstream/nodejs.git] / lib / _stream_readable.js
old mode 100755 (executable)
new mode 100644 (file)
index fd63969..6dadc28
@@ -26,6 +26,7 @@ var EE = require('events').EventEmitter;
 var Stream = require('stream');
 var util = require('util');
 var StringDecoder;
+var debug = util.debuglog('stream');
 
 util.inherits(Readable, Stream);
 
@@ -35,7 +36,8 @@ function ReadableState(options, stream) {
   // the point at which it stops calling _read() to fill the buffer
   // Note: 0 is a valid value, means "don't call _read preemptively ever"
   var hwm = options.highWaterMark;
-  this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
+  var defaultHwm = options.objectMode ? 16 : 16 * 1024;
+  this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
 
   // cast to ints.
   this.highWaterMark = ~~this.highWaterMark;
@@ -44,19 +46,13 @@ function ReadableState(options, stream) {
   this.length = 0;
   this.pipes = null;
   this.pipesCount = 0;
-  this.flowing = false;
+  this.flowing = null;
   this.ended = false;
   this.endEmitted = false;
   this.reading = false;
 
-  // In streams that never have any data, and do push(null) right away,
-  // the consumer can miss the 'end' event if they do some I/O before
-  // consuming the stream.  So, we don't emit('end') until some reading
-  // happens.
-  this.calledRead = false;
-
   // a flag to be able to tell if the onwrite cb is called immediately,
-  // or on a later tick.  We set this to true at first, becuase any
+  // or on a later tick.  We set this to true at first, because any
   // actions that shouldn't happen until "later" should generally also
   // not happen before the first write call.
   this.sync = true;
@@ -116,7 +112,7 @@ function Readable(options) {
 Readable.prototype.push = function(chunk, encoding) {
   var state = this._readableState;
 
-  if (typeof chunk === 'string' && !state.objectMode) {
+  if (util.isString(chunk) && !state.objectMode) {
     encoding = encoding || state.defaultEncoding;
     if (encoding !== state.encoding) {
       chunk = new Buffer(chunk, encoding);
@@ -137,7 +133,7 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) {
   var er = chunkInvalid(state, chunk);
   if (er) {
     stream.emit('error', er);
-  } else if (chunk === null || chunk === undefined) {
+  } else if (util.isNullOrUndefined(chunk)) {
     state.reading = false;
     if (!state.ended)
       onEofChunk(stream, state);
@@ -152,17 +148,24 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) {
       if (state.decoder && !addToFront && !encoding)
         chunk = state.decoder.write(chunk);
 
-      // update the buffer info.
-      state.length += state.objectMode ? 1 : chunk.length;
-      if (addToFront) {
-        state.buffer.unshift(chunk);
-      } else {
+      if (!addToFront)
         state.reading = false;
-        state.buffer.push(chunk);
-      }
 
-      if (state.needReadable)
-        emitReadable(stream);
+      // if we want the data now, just emit it.
+      if (state.flowing && state.length === 0 && !state.sync) {
+        stream.emit('data', chunk);
+        stream.read(0);
+      } else {
+        // update the buffer info.
+        state.length += state.objectMode ? 1 : chunk.length;
+        if (addToFront)
+          state.buffer.unshift(chunk);
+        else
+          state.buffer.push(chunk);
+
+        if (state.needReadable)
+          emitReadable(stream);
+      }
 
       maybeReadMore(stream, state);
     }
@@ -218,7 +221,7 @@ function howMuchToRead(n, state) {
   if (state.objectMode)
     return n === 0 ? 0 : 1;
 
-  if (isNaN(n) || n === null) {
+  if (isNaN(n) || util.isNull(n)) {
     // only flow one buffer at a time
     if (state.flowing && state.buffer.length)
       return state.buffer[0].length;
@@ -250,11 +253,11 @@ function howMuchToRead(n, state) {
 
 // you can override either this method, or the async _read(n) below.
 Readable.prototype.read = function(n) {
+  debug('read', n);
   var state = this._readableState;
-  state.calledRead = true;
   var nOrig = n;
 
-  if (typeof n !== 'number' || n > 0)
+  if (!util.isNumber(n) || n > 0)
     state.emittedReadable = false;
 
   // if we're doing read(0) to trigger a readable event, but we
@@ -263,7 +266,11 @@ Readable.prototype.read = function(n) {
   if (n === 0 &&
       state.needReadable &&
       (state.length >= state.highWaterMark || state.ended)) {
-    emitReadable(this);
+    debug('read: emitReadable', state.length, state.ended);
+    if (state.length === 0 && state.ended)
+      endReadable(this);
+    else
+      emitReadable(this);
     return null;
   }
 
@@ -300,17 +307,23 @@ Readable.prototype.read = function(n) {
 
   // if we need a readable event, then we need to do some reading.
   var doRead = state.needReadable;
+  debug('need readable', doRead);
 
   // if we currently have less than the highWaterMark, then also read some
-  if (state.length - n <= state.highWaterMark)
+  if (state.length === 0 || state.length - n < state.highWaterMark) {
     doRead = true;
+    debug('length less than watermark', doRead);
+  }
 
   // however, if we've ended, then there's no point, and if we're already
   // reading, then it's unnecessary.
-  if (state.ended || state.reading)
+  if (state.ended || state.reading) {
     doRead = false;
+    debug('reading or ended', doRead);
+  }
 
   if (doRead) {
+    debug('do read');
     state.reading = true;
     state.sync = true;
     // if the length is currently zero, then we *need* a readable event.
@@ -321,9 +334,8 @@ Readable.prototype.read = function(n) {
     state.sync = false;
   }
 
-  // If _read called its callback synchronously, then `reading`
-  // will be false, and we need to re-evaluate how much data we
-  // can return to the user.
+  // If _read pushed data synchronously, then `reading` will be false,
+  // and we need to re-evaluate how much data we can return to the user.
   if (doRead && !state.reading)
     n = howMuchToRead(nOrig, state);
 
@@ -333,7 +345,7 @@ Readable.prototype.read = function(n) {
   else
     ret = null;
 
-  if (ret === null) {
+  if (util.isNull(ret)) {
     state.needReadable = true;
     n = 0;
   }
@@ -345,21 +357,21 @@ Readable.prototype.read = function(n) {
   if (state.length === 0 && !state.ended)
     state.needReadable = true;
 
-  // If we happened to read() exactly the remaining amount in the
-  // buffer, and the EOF has been seen at this point, then make sure
-  // that we emit 'end' on the very next tick.
-  if (state.ended && !state.endEmitted && state.length === 0)
+  // If we tried to read() past the EOF, then emit end on the next tick.
+  if (nOrig !== n && state.ended && state.length === 0)
     endReadable(this);
 
+  if (!util.isNull(ret))
+    this.emit('data', ret);
+
   return ret;
 };
 
 function chunkInvalid(state, chunk) {
   var er = null;
-  if (!Buffer.isBuffer(chunk) &&
-      'string' !== typeof chunk &&
-      chunk !== null &&
-      chunk !== undefined &&
+  if (!util.isBuffer(chunk) &&
+      !util.isString(chunk) &&
+      !util.isNullOrUndefined(chunk) &&
       !state.objectMode &&
       !er) {
     er = new TypeError('Invalid non-string/buffer chunk');
@@ -378,12 +390,8 @@ function onEofChunk(stream, state) {
   }
   state.ended = true;
 
-  // if we've ended and we have some data left, then emit
-  // 'readable' now to make sure it gets picked up.
-  if (state.length > 0)
-    emitReadable(stream);
-  else
-    endReadable(stream);
+  // emit 'readable' now to make sure it gets picked up.
+  emitReadable(stream);
 }
 
 // Don't emit readable right away in sync mode, because this can trigger
@@ -392,20 +400,22 @@ function onEofChunk(stream, state) {
 function emitReadable(stream) {
   var state = stream._readableState;
   state.needReadable = false;
-  if (state.emittedReadable)
-    return;
-
-  state.emittedReadable = true;
-  if (state.sync)
-    process.nextTick(function() {
+  if (!state.emittedReadable) {
+    debug('emitReadable', state.flowing);
+    state.emittedReadable = true;
+    if (state.sync)
+      process.nextTick(function() {
+        emitReadable_(stream);
+      });
+    else
       emitReadable_(stream);
-    });
-  else
-    emitReadable_(stream);
+  }
 }
 
 function emitReadable_(stream) {
+  debug('emit readable');
   stream.emit('readable');
+  flow(stream);
 }
 
 
@@ -428,6 +438,7 @@ function maybeReadMore_(stream, state) {
   var len = state.length;
   while (!state.reading && !state.flowing && !state.ended &&
          state.length < state.highWaterMark) {
+    debug('maybeReadMore read 0');
     stream.read(0);
     if (len === state.length)
       // didn't get any data, stop spinning.
@@ -462,6 +473,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
       break;
   }
   state.pipesCount += 1;
+  debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
 
   var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
               dest !== process.stdout &&
@@ -475,11 +487,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
 
   dest.on('unpipe', onunpipe);
   function onunpipe(readable) {
-    if (readable !== src) return;
-    cleanup();
+    debug('onunpipe');
+    if (readable === src) {
+      cleanup();
+    }
   }
 
   function onend() {
+    debug('onend');
     dest.end();
   }
 
@@ -491,6 +506,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
   dest.on('drain', ondrain);
 
   function cleanup() {
+    debug('cleanup');
     // cleanup event handlers once the pipe is broken
     dest.removeListener('close', onclose);
     dest.removeListener('finish', onfinish);
@@ -499,19 +515,34 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
     dest.removeListener('unpipe', onunpipe);
     src.removeListener('end', onend);
     src.removeListener('end', cleanup);
+    src.removeListener('data', ondata);
 
     // if the reader is waiting for a drain event from this
     // specific writer, then it would cause it to never start
     // flowing again.
     // So, if this is awaiting a drain, then we just call it now.
     // If we don't know, then assume that we are waiting for one.
-    if (!dest._writableState || dest._writableState.needDrain)
+    if (state.awaitDrain &&
+        (!dest._writableState || dest._writableState.needDrain))
       ondrain();
   }
 
+  src.on('data', ondata);
+  function ondata(chunk) {
+    debug('ondata');
+    var ret = dest.write(chunk);
+    if (false === ret) {
+      debug('false write response, pause',
+            src._readableState.awaitDrain);
+      src._readableState.awaitDrain++;
+      src.pause();
+    }
+  }
+
   // if the dest has an error, then stop piping into it.
   // however, don't suppress the throwing behavior for this.
   function onerror(er) {
+    debug('onerror', er);
     unpipe();
     dest.removeListener('error', onerror);
     if (EE.listenerCount(dest, 'error') === 0)
@@ -535,12 +566,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
   }
   dest.once('close', onclose);
   function onfinish() {
+    debug('onfinish');
     dest.removeListener('close', onclose);
     unpipe();
   }
   dest.once('finish', onfinish);
 
   function unpipe() {
+    debug('unpipe');
     src.unpipe(dest);
   }
 
@@ -549,16 +582,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
 
   // start the flow if it hasn't been started already.
   if (!state.flowing) {
-    // the handler that waits for readable events after all
-    // the data gets sucked out in flow.
-    // This would be easier to follow with a .once() handler
-    // in flow(), but that is too slow.
-    this.on('readable', pipeOnReadable);
-
-    state.flowing = true;
-    process.nextTick(function() {
-      flow(src);
-    });
+    debug('pipe resume');
+    src.resume();
   }
 
   return dest;
@@ -566,63 +591,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
 
 function pipeOnDrain(src) {
   return function() {
-    var dest = this;
     var state = src._readableState;
-    state.awaitDrain--;
-    if (state.awaitDrain === 0)
+    debug('pipeOnDrain', state.awaitDrain);
+    if (state.awaitDrain)
+      state.awaitDrain--;
+    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
+      state.flowing = true;
       flow(src);
-  };
-}
-
-function flow(src) {
-  var state = src._readableState;
-  var chunk;
-  state.awaitDrain = 0;
-
-  function write(dest, i, list) {
-    var written = dest.write(chunk);
-    if (false === written) {
-      state.awaitDrain++;
     }
-  }
-
-  while (state.pipesCount && null !== (chunk = src.read())) {
-
-    if (state.pipesCount === 1)
-      write(state.pipes, 0, null);
-    else
-      state.pipes.forEach(write);
-
-    src.emit('data', chunk);
-
-    // if anyone needs a drain, then we have to wait for that.
-    if (state.awaitDrain > 0)
-      return;
-  }
-
-  // if every destination was unpiped, either before entering this
-  // function, or in the while loop, then stop flowing.
-  //
-  // NB: This is a pretty rare edge case.
-  if (state.pipesCount === 0) {
-    state.flowing = false;
-
-    // if there were data event listeners added, then switch to old mode.
-    if (EE.listenerCount(src, 'data') > 0)
-      emitDataEvents(src);
-    return;
-  }
-
-  // at this point, no one needed a drain, so we just ran out of data
-  // on the next readable event, start it over again.
-  state.ranOut = true;
-}
-
-function pipeOnReadable() {
-  if (this._readableState.ranOut) {
-    this._readableState.ranOut = false;
-    flow(this);
-  }
+  };
 }
 
 
@@ -645,7 +622,6 @@ Readable.prototype.unpipe = function(dest) {
     // got a match.
     state.pipes = null;
     state.pipesCount = 0;
-    this.removeListener('readable', pipeOnReadable);
     state.flowing = false;
     if (dest)
       dest.emit('unpipe', this);
@@ -660,7 +636,6 @@ Readable.prototype.unpipe = function(dest) {
     var len = state.pipesCount;
     state.pipes = null;
     state.pipesCount = 0;
-    this.removeListener('readable', pipeOnReadable);
     state.flowing = false;
 
     for (var i = 0; i < len; i++)
@@ -688,8 +663,11 @@ Readable.prototype.unpipe = function(dest) {
 Readable.prototype.on = function(ev, fn) {
   var res = Stream.prototype.on.call(this, ev, fn);
 
-  if (ev === 'data' && !this._readableState.flowing)
-    emitDataEvents(this);
+  // If listening to data, and it has not explicitly been paused,
+  // then call resume to start the flow of data on the next tick.
+  if (ev === 'data' && false !== this._readableState.flowing) {
+    this.resume();
+  }
 
   if (ev === 'readable' && this.readable) {
     var state = this._readableState;
@@ -698,7 +676,11 @@ Readable.prototype.on = function(ev, fn) {
       state.emittedReadable = false;
       state.needReadable = true;
       if (!state.reading) {
-        this.read(0);
+        var self = this;
+        process.nextTick(function() {
+          debug('readable nexttick read 0');
+          self.read(0);
+        });
       } else if (state.length) {
         emitReadable(this, state);
       }
@@ -712,63 +694,52 @@ Readable.prototype.addListener = Readable.prototype.on;
 // pause() and resume() are remnants of the legacy readable stream API
 // If the user uses them, then switch into old mode.
 Readable.prototype.resume = function() {
-  emitDataEvents(this);
-  this.read(0);
-  this.emit('resume');
+  var state = this._readableState;
+  if (!state.flowing) {
+    debug('resume');
+    state.flowing = true;
+    if (!state.reading) {
+      debug('resume read 0');
+      this.read(0);
+    }
+    resume(this, state);
+  }
 };
 
+function resume(stream, state) {
+  if (!state.resumeScheduled) {
+    state.resumeScheduled = true;
+    process.nextTick(function() {
+      resume_(stream, state);
+    });
+  }
+}
+
+function resume_(stream, state) {
+  state.resumeScheduled = false;
+  stream.emit('resume');
+  flow(stream);
+  if (state.flowing && !state.reading)
+    stream.read(0);
+}
+
 Readable.prototype.pause = function() {
-  emitDataEvents(this, true);
-  this.emit('pause');
+  debug('call pause flowing=%j', this._readableState.flowing);
+  if (false !== this._readableState.flowing) {
+    debug('pause');
+    this._readableState.flowing = false;
+    this.emit('pause');
+  }
 };
 
-function emitDataEvents(stream, startPaused) {
+function flow(stream) {
   var state = stream._readableState;
-
+  debug('flow', state.flowing);
   if (state.flowing) {
-    // https://github.com/isaacs/readable-stream/issues/16
-    throw new Error('Cannot switch to old mode now.');
+    do {
+      var chunk = stream.read();
+    } while (null !== chunk && state.flowing);
   }
-
-  var paused = startPaused || false;
-  var readable = false;
-
-  // convert to an old-style stream.
-  stream.readable = true;
-  stream.pipe = Stream.prototype.pipe;
-  stream.on = stream.addListener = Stream.prototype.on;
-
-  stream.on('readable', function() {
-    readable = true;
-
-    var c;
-    while (!paused && (null !== (c = stream.read())))
-      stream.emit('data', c);
-
-    if (c === null) {
-      readable = false;
-      stream._readableState.needReadable = true;
-    }
-  });
-
-  stream.pause = function() {
-    paused = true;
-    this.emit('pause');
-  };
-
-  stream.resume = function() {
-    paused = false;
-    if (readable)
-      process.nextTick(function() {
-        stream.emit('readable');
-      });
-    else
-      this.read(0);
-    this.emit('resume');
-  };
-
-  // now make it start, just in case it hadn't already.
-  stream.emit('readable');
 }
 
 // wrap an old-style stream as the async data source.
@@ -780,6 +751,7 @@ Readable.prototype.wrap = function(stream) {
 
   var self = this;
   stream.on('end', function() {
+    debug('wrapped end');
     if (state.decoder && !state.ended) {
       var chunk = state.decoder.end();
       if (chunk && chunk.length)
@@ -790,6 +762,7 @@ Readable.prototype.wrap = function(stream) {
   });
 
   stream.on('data', function(chunk) {
+    debug('wrapped data');
     if (state.decoder)
       chunk = state.decoder.write(chunk);
     if (!chunk || !state.objectMode && !chunk.length)
@@ -805,8 +778,7 @@ Readable.prototype.wrap = function(stream) {
   // proxy all the other methods.
   // important when wrapping filters and duplexes.
   for (var i in stream) {
-    if (typeof stream[i] === 'function' &&
-        typeof this[i] === 'undefined') {
+    if (util.isFunction(stream[i]) && util.isUndefined(this[i])) {
       this[i] = function(method) { return function() {
         return stream[method].apply(stream, arguments);
       }}(i);
@@ -822,6 +794,7 @@ Readable.prototype.wrap = function(stream) {
   // when we try to consume some more bytes, simply unpause the
   // underlying stream.
   self._read = function(n) {
+    debug('wrapped _read', n);
     if (paused) {
       paused = false;
       stream.resume();
@@ -910,7 +883,7 @@ function endReadable(stream) {
   if (state.length > 0)
     throw new Error('endReadable called on non-empty stream');
 
-  if (!state.endEmitted && state.calledRead) {
+  if (!state.endEmitted) {
     state.ended = true;
     process.nextTick(function() {
       // Check that we didn't get one last unshift.