Merge remote-tracking branch 'ry/v0.10'
authorisaacs <i@izs.me>
Wed, 28 Aug 2013 17:11:17 +0000 (10:11 -0700)
committerisaacs <i@izs.me>
Wed, 28 Aug 2013 17:11:17 +0000 (10:11 -0700)
Conflicts:
AUTHORS
ChangeLog
deps/uv/ChangeLog
deps/uv/include/uv-darwin.h
deps/uv/src/unix/darwin.c
deps/uv/src/unix/fsevents.c
deps/uv/src/version.c
lib/_stream_writable.js
src/node_version.h

1  2 
AUTHORS
ChangeLog
lib/_stream_readable.js

diff --combined AUTHORS
+++ b/AUTHORS
@@@ -431,13 -431,8 +431,13 @@@ JeongHoon Byun <outsideris@gmail.com
  Iskren Ivov Chernev <iskren.chernev@gmail.com>
  Alexey Kupershtokh <alexey.kupershtokh@gmail.com>
  Benjamin Ruston <benjy.ruston@gmail.com>
 +Manav Rathi <manav.r@directi.com>
 +Marcin Kostrzewa <marcinkostrzewa@yahoo.com>
 +Suwon Chae <doortts@gmail.com>
 +David Braun <NodeJS-box@snkmail.com>
  Mitar Milutinovic <mitar.git@tnode.com>
  Michael Hart <michael.hart.au@gmail.com>
 +Jeff Barczewski <jeff.barczewski@gmail.com>
  Andrew Hart <hartandrewr@gmail.com>
  Rafael Garcia <rgarcia2009@gmail.com>
  Tobias Müllerleile <tobias@muellerleile.net>
@@@ -447,7 -442,6 +447,7 @@@ Kelly Gerber <kellygerber22@yahoo.com
  Ryan Doenges <rhdoenges@gmail.com>
  Sean Silva <chisophugis@gmail.com>
  Miroslav Bajtoš <miro.bajtos@gmail.com>
 +Olof Johansson <olof@ethup.se>
  Sam Roberts <vieuxtech@gmail.com>
  Kevin Locke <kevin@kevinlocke.name>
  Daniel Moore <polaris@northhorizon.net>
@@@ -455,31 -449,20 +455,31 @@@ Robert Kowalski <rok@kowalski.gd
  Benoit Vallée <github@benoitvallee.net>
  Ryuichi Okumura <okuryu@okuryu.com>
  Brandon Frohs <bfrohs@gmail.com>
 +Nick Sullivan <nick@sullivanflock.com>
  Nathan Zadoks <nathan@nathan7.eu>
  Rafael Henrique Moreira <rafadev7@gmail.com>
  Daniel G. Taylor <dan@programmer-art.org>
  Kiyoshi Nomo <tokyoincidents.g@gmail.com>
  Veres Lajos <vlajos@gmail.com>
  Yuan Chuan <yuanchuan23@gmail.com>
 +Krzysztof Chrapka <chrapka.k@gmail.com>
 +Linus Mårtensson <linus.martensson@sonymobile.com>
  Peter Rust <peter@cornerstonenw.com>
  Shuan Wang <shuanwang@gmail.com>
 -Andrew Chilton <andychilton@gmail.com>
  Wyatt Preul <wpreul@gmail.com>
 +David Björklund <david.bjorklund@gmail.com>
 +Dav Glass <davglass@gmail.com>
 +Andrew Chilton <andychilton@gmail.com>
 +Antony Bailey <support@antonybailey.net>
  Forrest L Norvell <ogd@aoaioxxysz.net>
 +Evan Solomon <evan@evanalyze.com>
  Eran Hammer <eran@hueniverse.com>
- Matthias Bartelmeß <mba@fourplusone.de>
  Daniel Chatfield <chatfielddaniel@gmail.com>
  Eivind Uggedal <eivind@uggedal.com>
  Edward Hutchins <eahutchins@gmail.com>
- James Halliday <mail@substack.net>
  Chris Wren <cthewren@gmail.com>
  Duan Yao <duanyao@ustc.edu>
++Matthias Bartelmeß <mba@fourplusone.de>
++James Halliday <mail@substack.net>
 +Matthew Aynalem <maynalem@gmail.com>
 +Vsevolod Strukchinsky <floatdrop@yandex-team.ru>
diff --combined ChangeLog
+++ b/ChangeLog
 -2013.08.21, Version 0.10.17 (Stable)
 +2013.08.21, Version 0.11.6 (Unstable)
 +
 +* uv: Upgrade to v0.11.8
 +
 +* v8: upgrade v8 to 3.20.14.1
 +
 +* build: disable SSLv2 by default (Ben Noordhuis)
 +
 +* build: don't auto-destroy existing configuration (Ben Noordhuis)
 +
 +* crypto: add TLS 1.1 and 1.2 to secureProtocol list (Matthias Bartelmeß)
 +
 +* crypto: fix memory leak in randomBytes() error path (Ben Noordhuis)
 +
 +* dgram: don't call into js when send cb is omitted (Ben Noordhuis)
 +
 +* dgram: fix regression in string argument handling (Ben Noordhuis)
 +
 +* domains: performance improvements (Trevor Norris)
 +
 +* events: EventEmitter = require('events') (Jake Verbaten)
 +
 +* http: Add write()/end() callbacks (isaacs)
 +
 +* http: Consistent 'finish' event semantics (isaacs)
 +
 +* http: Prefer 'binary' over 'ascii' (isaacs)
 +
 +* http: Support legacy agent.addRequest API (isaacs)
 +
 +* http: Write hex/base64 chunks properly (isaacs)
 +
 +* http: add agent.maxFreeSockets option (isaacs)
 +
 +* http: provide access to raw headers/trailers (isaacs)
 +
 +* http: removed headers stay removed (James Halliday)
 +
 +* http,timers: improve callback performance (Ben Noordhuis)
 +
 +* net: family option in net.connect (Vsevolod Strukchinsky)
 +
 +* readline: pause stdin before turning off terminal raw mode (Daniel Chatfield)
 +
 +* smalloc: allow different external array types (Trevor Norris)
 +
 +* smalloc: expose ExternalArraySize (Trevor Norris)
 +
 +* stream: Short-circuit buffer pushes when flowing (isaacs)
 +
 +* tls: handle errors on socket before releasing it (Fedor Indutny)
 +
 +* util: fix isPrimitive check (Trevor Norris)
 +
 +* util: isObject should always return boolean (Trevor Norris)
 +
 +
 +2013.08.06, Version 0.11.5 (Unstable), 6f92da2dd106b0c63fde563284f83e08e2a521b5
 +
 +* v8: upgrade to 3.20.11
 +
 +* uv: upgrade to v0.11.7
 +
 +* buffer: return offset for end of last write (Trevor Norris)
 +
 +* build: embed the mdb_v8.so into the binary (Timothy J Fontaine)
 +
 +* build: fix --without-ssl build (Ben Noordhuis)
 +
 +* child_process: add 'shell' option to .exec() (Ben Noordhuis)
 +
 +* dgram: report send errors to cb, don't pass bytes (Ben Noordhuis)
 +
 +* fs: write strings directly to disk (Trevor Norris)
 +
 +* https: fix default port (Koichi Kobayashi)
 +
 +* openssl: use asm for sha, md5, rmd (Fedor Indutny)
 +
 +* os: add mac address to networkInterfaces() output (Brian White)
 +
 +* smalloc: introduce smalloc module (Trevor Norris)
 +
 +* stream: Simplify flowing, passive data listening (streams3) (isaacs)
 +
 +* tls: asynchronous SNICallback (Fedor Indutny)
 +
 +* tls: share tls tickets key between cluster workers (Fedor Indutny)
 +
 +* util: don't throw on circular %j input to format() (Ben Noordhuis)
 +
 +
 +2013.07.12, Version 0.11.4 (Unstable), b5b84197ed037918fd1a26e5cb87cce7c812ca55
 +
 +* npm: Upgrade to 1.3.4
 +
 +* v8: Upgrade to v3.20.2
 +
 +* c-ares: Upgrade to piscisaureus/cares@805d153
 +
 +* timers: setImmediate process full queue each turn (Ben Noordhuis)
 +
 +* http: Add agent.get/request methods (isaacs)
 +
 +* http: Proper KeepAlive behavior (isaacs)
 +
 +* configure: fix the --without-ssl option (Nathan Rajlich)
 +
 +* buffer: propagate originating parent (Trevor Norris)
 +
 +* tls_wrap: return Error not throw for missing cert (Timothy J Fontaine)
 +
 +* src: enable native v8 typed arrays (Ben Noordhuis)
 +
 +* stream: objectMode transform should allow falsey values (Jeff Barczewski)
 +
 +* slab_allocator: remove SlabAllocator (Trevor Norris)
 +
 +* crypto: fix memory leak in LoadPKCS12 (Fedor Indutny)
 +
 +* tls: export TLSSocket (Fedor Indutny)
 +
 +* zlib: allow changing of level and strategy (Brian White)
 +
 +* zlib: allow custom flush type for flush() (Brian White)
 +
 +
 +2013.06.26, Version 0.11.3 (Unstable), 38c0c47bbe280ddc42054418091571e532d82a1e
 +
 +* uv: Upgrade to v0.11.5
 +
 +* c-ares: upgrade to 1.10.0
 +
 +* v8: upgrade to v3.19.13
 +
 +* punycode: update to v1.2.3 (Mathias Bynens)
 +
 +* debugger: break on uncaught exception (Miroslav Bajtos)
 +
 +* child_process: emit 'disconnect' asynchronously (Ben Noordhuis)
 +
 +* dtrace: enable uv's probes if enabled (Timothy J Fontaine)
 +
 +* dtrace: unify dtrace and systemtap interfaces (Timothy J Fontaine)
 +
 +* buffer: New API for backing data store (Trevor Norris)
 +
 +* buffer: return `this` in fill() for chainability (Brian White)
 +
 +* build: fix include order for building on windows (Timothy J Fontaine)
 +
 +* build: add android support (Linus Mårtensson)
 +
 +* readline: strip ctrl chars for prompt width calc (Krzysztof Chrapka)
 +
 +* tls: introduce TLSSocket based on tls_wrap binding (Fedor Indutny)
 +
 +* tls: add localAddress and localPort properties (Ben Noordhuis)
 +
 +* crypto: free excessive memory in NodeBIO (Fedor Indutny)
 +
 +* process: remove maxTickDepth (Trevor Norris)
 +
 +* timers: use uv_now instead of Date.now (Timothy J Fontaine)
 +
 +* util: Add debuglog, deprecate console lookalikes (isaacs)
 +
 +* module: use path.sep instead of a custom solution (Robert Kowalski)
 +
 +* http: don't escape request path, reject bad chars (Ben Noordhuis)
 +
 +* net: emit dns 'lookup' event before connect (Ben Noordhuis)
 +
 +* dns: add getServers and setServers (Timothy J Fontaine)
 +
 +
 +2013.05.13, Version 0.11.2 (Unstable), 5d3dc0e4c3369dfb00b7b13e08936c2e652fa696
 +
 +* uv: Upgrade to 0.11.2
 +
 +* V8: Upgrade to 3.19.0
 +
 +* npm: Upgrade to 1.2.21
 +
 +* build: Makefile should respect configure --prefix (Timothy J Fontaine)
 +
 +* cluster: use round-robin load balancing (Ben Noordhuis)
 +
 +* debugger, cluster: each worker has new debug port (Miroslav Bajtoš)
 +
 +* debugger: `restart` with custom debug port (Miroslav Bajtoš)
 +
 +* debugger: breakpoints in scripts not loaded yet (Miroslav Bajtoš)
 +
 +* event: EventEmitter#setMaxListeners() returns this (Sam Roberts)
 +
 +* events: add EventEmitter.defaultMaxListeners (Ben Noordhuis)
 +
 +* install: Support $(PREFIX) install target directory prefix (Olof Johansson)
 +
 +* os: Include netmask in os.networkInterfaces() (Ben Kelly)
 +
 +* path: add path.isAbsolute(path) (Ryan Doenges)
 +
 +* stream: Guarantee ordering of 'finish' event (isaacs)
 +
 +* streams: introduce .cork/.uncork/._writev (Fedor Indutny)
 +
 +* vm: add support for timeout argument (Andrew Paprocki)
 +
 +
 +2013.04.19, Version 0.11.1 (Unstable), 4babd2b46ebf9fbea2c9946af5cfae25a33b2b22
 +
 +* V8: upgrade to 3.18.0
 +
 +* uv: Upgrade to v0.11.1
 +
 +* http: split into multiple separate modules (Timothy J Fontaine)
 +
 +* http: escape unsafe characters in request path (Ben Noordhuis)
 +
 +* url: Escape all unwise characters (isaacs)
 +
 +* build: depend on v8 postmortem-metadata if enabled (Paddy Byers)
 +
 +* etw: update prototypes to match dtrace provider (Timothy J Fontaine)
 +
 +* buffer: change output of Buffer.prototype.toJSON() (David Braun)
 +
 +* dtrace: actually use the _handle.fd value (Timothy J Fontaine)
 +
 +* dtrace: pass more arguments to probes (Dave Pacheco)
 +
 +* build: allow building with dtrace on osx (Dave Pacheco)
 +
 +* zlib: allow passing options to convenience methods (Kyle Robinson Young)
 +
 +
 +2013.03.28, Version 0.11.0 (Unstable), bce38b3d74e64fcb7d04a2dd551151da6168cdc5
 +
 +* V8: update to 3.17.13
 +
 +* os: use %SystemRoot% or %windir% in os.tmpdir() (Suwon Chae)
 +
 +* util: fix util.inspect() line width calculation (Marcin Kostrzewa)
 +
 +* buffer: remove _charsWritten (Trevor Norris)
 +
 +* fs: uv_[fl]stat now reports subsecond resolution (Timothy J Fontaine)
 +
 +* fs: Throw if error raised and missing callback (bnoordhuis)
 +
 +* tls: expose SSL_CTX_set_timeout via tls.createServer (Manav Rathi)
 +
 +* tls: remove harmful unnecessary bounds checking (Marcel Laverdet)
 +
 +* buffer: write ascii strings using WriteOneByte (Trevor Norris)
 +
 +* dtrace: fix generation of v8 constants on freebsd (Fedor Indutny)
 +
 +* dtrace: x64 ustack helper (Fedor Indutny)
 +
 +* readline: handle wide characters properly (Nao Iizuka)
 +
 +* repl: Use a domain to catch async errors safely (isaacs)
 +
 +* repl: emit 'reset' event when context is reset (Sami Samhuri)
 +
 +* util: custom `inspect()` method may return an Object (Nathan Rajlich)
 +
 +* console: `console.dir()` bypasses inspect() methods (Nathan Rajlich)
 +
 +
++2013.08.21, Version 0.10.17 (Stable), 469a4a5091a677df62be319675056b869c31b35c
+ * uv: Upgrade v0.10.14
+ * http_parser: Do not accept PUN/GEM methods as PUT/GET (Chris Dickinson)
+ * tls: fix assertion when ssl is destroyed at read (Fedor Indutny)
+ * stream: Throw on 'error' if listeners removed (isaacs)
+ * dgram: fix assertion on bad send() arguments (Ben Noordhuis)
+ * readline: pause stdin before turning off terminal raw mode (Daniel Chatfield)
  2013.08.16, Version 0.10.16 (Stable), 50b4c905a4425430ae54db4906f88982309e128d
  
  * v8: back-port fix for CVE-2013-2882
  * stream: Fix double pipe error emit (Eran Hammer)
  
  
 -2013.07.25, Version 0.10.15 (Stable)
 +2013.07.25, Version 0.10.15 (Stable), 2426d65af860bda7be9f0832a99601cc43c6cf63
  
  * src: fix process.getuid() return value (Ben Noordhuis)
  
  * net: Fix busy loop on POLLERR|POLLHUP on older linux kernels (Ben Noordhuis, isaacs)
  
  
 -
  2013.06.04, Version 0.10.10 (Stable), 25e51c396aa23018603baae2b1d9390f5d9db496
  
  * uv: Upgrade to 0.10.10
  * stream: Fix unshift() race conditions (isaacs)
  
  
 +
 +
  2013.04.11, Version 0.10.4 (Stable), 9712aa9f76073c30850b20a188b1ed12ffb74d17
  
  * uv: Upgrade to 0.10.4
  * src: tie process.versions.uv to uv_version_string() (Ben Noordhuis)
  
  
 -2013.03.28, Version 0.10.2 (Stable)
 +2013.03.28, Version 0.10.2 (Stable), 1e0de9c426e07a260bbec2d2196c2d2db8eb8886
  
  * npm: Upgrade to 1.2.15
  
diff --combined lib/_stream_readable.js
index 95c044a,fd63969..6dadc28
mode 100644,100755..100644
@@@ -26,7 -26,6 +26,7 @@@ var EE = require('events').EventEmitter
  var Stream = require('stream');
  var util = require('util');
  var StringDecoder;
 +var debug = util.debuglog('stream');
  
  util.inherits(Readable, Stream);
  
@@@ -36,8 -35,7 +36,8 @@@ function ReadableState(options, stream
    // the point at which it stops calling _read() to fill the buffer
    // Note: 0 is a valid value, means "don't call _read preemptively ever"
    var hwm = options.highWaterMark;
 -  this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
 +  var defaultHwm = options.objectMode ? 16 : 16 * 1024;
 +  this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
  
    // cast to ints.
    this.highWaterMark = ~~this.highWaterMark;
    this.length = 0;
    this.pipes = null;
    this.pipesCount = 0;
 -  this.flowing = false;
 +  this.flowing = null;
    this.ended = false;
    this.endEmitted = false;
    this.reading = false;
  
 -  // In streams that never have any data, and do push(null) right away,
 -  // the consumer can miss the 'end' event if they do some I/O before
 -  // consuming the stream.  So, we don't emit('end') until some reading
 -  // happens.
 -  this.calledRead = false;
 -
    // a flag to be able to tell if the onwrite cb is called immediately,
 -  // or on a later tick.  We set this to true at first, becuase any
 +  // or on a later tick.  We set this to true at first, because any
    // actions that shouldn't happen until "later" should generally also
    // not happen before the first write call.
    this.sync = true;
@@@ -112,7 -116,7 +112,7 @@@ function Readable(options) 
  Readable.prototype.push = function(chunk, encoding) {
    var state = this._readableState;
  
 -  if (typeof chunk === 'string' && !state.objectMode) {
 +  if (util.isString(chunk) && !state.objectMode) {
      encoding = encoding || state.defaultEncoding;
      if (encoding !== state.encoding) {
        chunk = new Buffer(chunk, encoding);
@@@ -133,7 -137,7 +133,7 @@@ function readableAddChunk(stream, state
    var er = chunkInvalid(state, chunk);
    if (er) {
      stream.emit('error', er);
 -  } else if (chunk === null || chunk === undefined) {
 +  } else if (util.isNullOrUndefined(chunk)) {
      state.reading = false;
      if (!state.ended)
        onEofChunk(stream, state);
        if (state.decoder && !addToFront && !encoding)
          chunk = state.decoder.write(chunk);
  
 -      // update the buffer info.
 -      state.length += state.objectMode ? 1 : chunk.length;
 -      if (addToFront) {
 -        state.buffer.unshift(chunk);
 -      } else {
 +      if (!addToFront)
          state.reading = false;
 -        state.buffer.push(chunk);
 -      }
  
 -      if (state.needReadable)
 -        emitReadable(stream);
 +      // if we want the data now, just emit it.
 +      if (state.flowing && state.length === 0 && !state.sync) {
 +        stream.emit('data', chunk);
 +        stream.read(0);
 +      } else {
 +        // update the buffer info.
 +        state.length += state.objectMode ? 1 : chunk.length;
 +        if (addToFront)
 +          state.buffer.unshift(chunk);
 +        else
 +          state.buffer.push(chunk);
 +
 +        if (state.needReadable)
 +          emitReadable(stream);
 +      }
  
        maybeReadMore(stream, state);
      }
@@@ -221,7 -218,7 +221,7 @@@ function howMuchToRead(n, state) 
    if (state.objectMode)
      return n === 0 ? 0 : 1;
  
 -  if (isNaN(n) || n === null) {
 +  if (isNaN(n) || util.isNull(n)) {
      // only flow one buffer at a time
      if (state.flowing && state.buffer.length)
        return state.buffer[0].length;
  
  // you can override either this method, or the async _read(n) below.
  Readable.prototype.read = function(n) {
 +  debug('read', n);
    var state = this._readableState;
 -  state.calledRead = true;
    var nOrig = n;
  
 -  if (typeof n !== 'number' || n > 0)
 +  if (!util.isNumber(n) || n > 0)
      state.emittedReadable = false;
  
    // if we're doing read(0) to trigger a readable event, but we
    if (n === 0 &&
        state.needReadable &&
        (state.length >= state.highWaterMark || state.ended)) {
 -    emitReadable(this);
 +    debug('read: emitReadable', state.length, state.ended);
 +    if (state.length === 0 && state.ended)
 +      endReadable(this);
 +    else
 +      emitReadable(this);
      return null;
    }
  
  
    // if we need a readable event, then we need to do some reading.
    var doRead = state.needReadable;
 +  debug('need readable', doRead);
  
    // if we currently have less than the highWaterMark, then also read some
 -  if (state.length - n <= state.highWaterMark)
 +  if (state.length === 0 || state.length - n < state.highWaterMark) {
      doRead = true;
 +    debug('length less than watermark', doRead);
 +  }
  
    // however, if we've ended, then there's no point, and if we're already
    // reading, then it's unnecessary.
 -  if (state.ended || state.reading)
 +  if (state.ended || state.reading) {
      doRead = false;
 +    debug('reading or ended', doRead);
 +  }
  
    if (doRead) {
 +    debug('do read');
      state.reading = true;
      state.sync = true;
      // if the length is currently zero, then we *need* a readable event.
      state.sync = false;
    }
  
 -  // If _read called its callback synchronously, then `reading`
 -  // will be false, and we need to re-evaluate how much data we
 -  // can return to the user.
 +  // If _read pushed data synchronously, then `reading` will be false,
 +  // and we need to re-evaluate how much data we can return to the user.
    if (doRead && !state.reading)
      n = howMuchToRead(nOrig, state);
  
    else
      ret = null;
  
 -  if (ret === null) {
 +  if (util.isNull(ret)) {
      state.needReadable = true;
      n = 0;
    }
    if (state.length === 0 && !state.ended)
      state.needReadable = true;
  
 -  // If we happened to read() exactly the remaining amount in the
 -  // buffer, and the EOF has been seen at this point, then make sure
 -  // that we emit 'end' on the very next tick.
 -  if (state.ended && !state.endEmitted && state.length === 0)
 +  // If we tried to read() past the EOF, then emit end on the next tick.
 +  if (nOrig !== n && state.ended && state.length === 0)
      endReadable(this);
  
 +  if (!util.isNull(ret))
 +    this.emit('data', ret);
 +
    return ret;
  };
  
  function chunkInvalid(state, chunk) {
    var er = null;
 -  if (!Buffer.isBuffer(chunk) &&
 -      'string' !== typeof chunk &&
 -      chunk !== null &&
 -      chunk !== undefined &&
 +  if (!util.isBuffer(chunk) &&
 +      !util.isString(chunk) &&
 +      !util.isNullOrUndefined(chunk) &&
        !state.objectMode &&
        !er) {
      er = new TypeError('Invalid non-string/buffer chunk');
@@@ -390,8 -378,12 +390,8 @@@ function onEofChunk(stream, state) 
    }
    state.ended = true;
  
 -  // if we've ended and we have some data left, then emit
 -  // 'readable' now to make sure it gets picked up.
 -  if (state.length > 0)
 -    emitReadable(stream);
 -  else
 -    endReadable(stream);
 +  // emit 'readable' now to make sure it gets picked up.
 +  emitReadable(stream);
  }
  
  // Don't emit readable right away in sync mode, because this can trigger
  function emitReadable(stream) {
    var state = stream._readableState;
    state.needReadable = false;
 -  if (state.emittedReadable)
 -    return;
 -
 -  state.emittedReadable = true;
 -  if (state.sync)
 -    process.nextTick(function() {
 +  if (!state.emittedReadable) {
 +    debug('emitReadable', state.flowing);
 +    state.emittedReadable = true;
 +    if (state.sync)
 +      process.nextTick(function() {
 +        emitReadable_(stream);
 +      });
 +    else
        emitReadable_(stream);
 -    });
 -  else
 -    emitReadable_(stream);
 +  }
  }
  
  function emitReadable_(stream) {
 +  debug('emit readable');
    stream.emit('readable');
 +  flow(stream);
  }
  
  
@@@ -438,7 -428,6 +438,7 @@@ function maybeReadMore_(stream, state) 
    var len = state.length;
    while (!state.reading && !state.flowing && !state.ended &&
           state.length < state.highWaterMark) {
 +    debug('maybeReadMore read 0');
      stream.read(0);
      if (len === state.length)
        // didn't get any data, stop spinning.
@@@ -473,7 -462,6 +473,7 @@@ Readable.prototype.pipe = function(dest
        break;
    }
    state.pipesCount += 1;
 +  debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
  
    var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
                dest !== process.stdout &&
  
    dest.on('unpipe', onunpipe);
    function onunpipe(readable) {
 -    if (readable !== src) return;
 -    cleanup();
 +    debug('onunpipe');
 +    if (readable === src) {
 +      cleanup();
 +    }
    }
  
    function onend() {
 +    debug('onend');
      dest.end();
    }
  
    dest.on('drain', ondrain);
  
    function cleanup() {
 +    debug('cleanup');
      // cleanup event handlers once the pipe is broken
      dest.removeListener('close', onclose);
      dest.removeListener('finish', onfinish);
      dest.removeListener('unpipe', onunpipe);
      src.removeListener('end', onend);
      src.removeListener('end', cleanup);
 +    src.removeListener('data', ondata);
  
      // if the reader is waiting for a drain event from this
      // specific writer, then it would cause it to never start
      // flowing again.
      // So, if this is awaiting a drain, then we just call it now.
      // If we don't know, then assume that we are waiting for one.
 -    if (!dest._writableState || dest._writableState.needDrain)
 +    if (state.awaitDrain &&
 +        (!dest._writableState || dest._writableState.needDrain))
        ondrain();
    }
  
 +  src.on('data', ondata);
 +  function ondata(chunk) {
 +    debug('ondata');
 +    var ret = dest.write(chunk);
 +    if (false === ret) {
 +      debug('false write response, pause',
 +            src._readableState.awaitDrain);
 +      src._readableState.awaitDrain++;
 +      src.pause();
 +    }
 +  }
 +
    // if the dest has an error, then stop piping into it.
    // however, don't suppress the throwing behavior for this.
    function onerror(er) {
 +    debug('onerror', er);
      unpipe();
      dest.removeListener('error', onerror);
      if (EE.listenerCount(dest, 'error') === 0)
    }
    // This is a brutally ugly hack to make sure that our error handler
    // is attached before any userland ones.  NEVER DO THIS.
-   if (!dest._events.error)
+   if (!dest._events || !dest._events.error)
      dest.on('error', onerror);
    else if (Array.isArray(dest._events.error))
      dest._events.error.unshift(onerror);
    }
    dest.once('close', onclose);
    function onfinish() {
 +    debug('onfinish');
      dest.removeListener('close', onclose);
      unpipe();
    }
    dest.once('finish', onfinish);
  
    function unpipe() {
 +    debug('unpipe');
      src.unpipe(dest);
    }
  
  
    // start the flow if it hasn't been started already.
    if (!state.flowing) {
 -    // the handler that waits for readable events after all
 -    // the data gets sucked out in flow.
 -    // This would be easier to follow with a .once() handler
 -    // in flow(), but that is too slow.
 -    this.on('readable', pipeOnReadable);
 -
 -    state.flowing = true;
 -    process.nextTick(function() {
 -      flow(src);
 -    });
 +    debug('pipe resume');
 +    src.resume();
    }
  
    return dest;
  
  function pipeOnDrain(src) {
    return function() {
 -    var dest = this;
      var state = src._readableState;
 -    state.awaitDrain--;
 -    if (state.awaitDrain === 0)
 +    debug('pipeOnDrain', state.awaitDrain);
 +    if (state.awaitDrain)
 +      state.awaitDrain--;
 +    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
 +      state.flowing = true;
        flow(src);
 -  };
 -}
 -
 -function flow(src) {
 -  var state = src._readableState;
 -  var chunk;
 -  state.awaitDrain = 0;
 -
 -  function write(dest, i, list) {
 -    var written = dest.write(chunk);
 -    if (false === written) {
 -      state.awaitDrain++;
      }
 -  }
 -
 -  while (state.pipesCount && null !== (chunk = src.read())) {
 -
 -    if (state.pipesCount === 1)
 -      write(state.pipes, 0, null);
 -    else
 -      state.pipes.forEach(write);
 -
 -    src.emit('data', chunk);
 -
 -    // if anyone needs a drain, then we have to wait for that.
 -    if (state.awaitDrain > 0)
 -      return;
 -  }
 -
 -  // if every destination was unpiped, either before entering this
 -  // function, or in the while loop, then stop flowing.
 -  //
 -  // NB: This is a pretty rare edge case.
 -  if (state.pipesCount === 0) {
 -    state.flowing = false;
 -
 -    // if there were data event listeners added, then switch to old mode.
 -    if (EE.listenerCount(src, 'data') > 0)
 -      emitDataEvents(src);
 -    return;
 -  }
 -
 -  // at this point, no one needed a drain, so we just ran out of data
 -  // on the next readable event, start it over again.
 -  state.ranOut = true;
 -}
 -
 -function pipeOnReadable() {
 -  if (this._readableState.ranOut) {
 -    this._readableState.ranOut = false;
 -    flow(this);
 -  }
 +  };
  }
  
  
@@@ -622,6 -645,7 +622,6 @@@ Readable.prototype.unpipe = function(de
      // got a match.
      state.pipes = null;
      state.pipesCount = 0;
 -    this.removeListener('readable', pipeOnReadable);
      state.flowing = false;
      if (dest)
        dest.emit('unpipe', this);
      var len = state.pipesCount;
      state.pipes = null;
      state.pipesCount = 0;
 -    this.removeListener('readable', pipeOnReadable);
      state.flowing = false;
  
      for (var i = 0; i < len; i++)
  Readable.prototype.on = function(ev, fn) {
    var res = Stream.prototype.on.call(this, ev, fn);
  
 -  if (ev === 'data' && !this._readableState.flowing)
 -    emitDataEvents(this);
 +  // If listening to data, and it has not explicitly been paused,
 +  // then call resume to start the flow of data on the next tick.
 +  if (ev === 'data' && false !== this._readableState.flowing) {
 +    this.resume();
 +  }
  
    if (ev === 'readable' && this.readable) {
      var state = this._readableState;
        state.emittedReadable = false;
        state.needReadable = true;
        if (!state.reading) {
 -        this.read(0);
 +        var self = this;
 +        process.nextTick(function() {
 +          debug('readable nexttick read 0');
 +          self.read(0);
 +        });
        } else if (state.length) {
          emitReadable(this, state);
        }
@@@ -694,52 -712,63 +694,52 @@@ Readable.prototype.addListener = Readab
  // pause() and resume() are remnants of the legacy readable stream API
  // If the user uses them, then switch into old mode.
  Readable.prototype.resume = function() {
 -  emitDataEvents(this);
 -  this.read(0);
 -  this.emit('resume');
 +  var state = this._readableState;
 +  if (!state.flowing) {
 +    debug('resume');
 +    state.flowing = true;
 +    if (!state.reading) {
 +      debug('resume read 0');
 +      this.read(0);
 +    }
 +    resume(this, state);
 +  }
  };
  
 +function resume(stream, state) {
 +  if (!state.resumeScheduled) {
 +    state.resumeScheduled = true;
 +    process.nextTick(function() {
 +      resume_(stream, state);
 +    });
 +  }
 +}
 +
 +function resume_(stream, state) {
 +  state.resumeScheduled = false;
 +  stream.emit('resume');
 +  flow(stream);
 +  if (state.flowing && !state.reading)
 +    stream.read(0);
 +}
 +
  Readable.prototype.pause = function() {
 -  emitDataEvents(this, true);
 -  this.emit('pause');
 +  debug('call pause flowing=%j', this._readableState.flowing);
 +  if (false !== this._readableState.flowing) {
 +    debug('pause');
 +    this._readableState.flowing = false;
 +    this.emit('pause');
 +  }
  };
  
 -function emitDataEvents(stream, startPaused) {
 +function flow(stream) {
    var state = stream._readableState;
 -
 +  debug('flow', state.flowing);
    if (state.flowing) {
 -    // https://github.com/isaacs/readable-stream/issues/16
 -    throw new Error('Cannot switch to old mode now.');
 +    do {
 +      var chunk = stream.read();
 +    } while (null !== chunk && state.flowing);
    }
 -
 -  var paused = startPaused || false;
 -  var readable = false;
 -
 -  // convert to an old-style stream.
 -  stream.readable = true;
 -  stream.pipe = Stream.prototype.pipe;
 -  stream.on = stream.addListener = Stream.prototype.on;
 -
 -  stream.on('readable', function() {
 -    readable = true;
 -
 -    var c;
 -    while (!paused && (null !== (c = stream.read())))
 -      stream.emit('data', c);
 -
 -    if (c === null) {
 -      readable = false;
 -      stream._readableState.needReadable = true;
 -    }
 -  });
 -
 -  stream.pause = function() {
 -    paused = true;
 -    this.emit('pause');
 -  };
 -
 -  stream.resume = function() {
 -    paused = false;
 -    if (readable)
 -      process.nextTick(function() {
 -        stream.emit('readable');
 -      });
 -    else
 -      this.read(0);
 -    this.emit('resume');
 -  };
 -
 -  // now make it start, just in case it hadn't already.
 -  stream.emit('readable');
  }
  
  // wrap an old-style stream as the async data source.
@@@ -751,7 -780,6 +751,7 @@@ Readable.prototype.wrap = function(stre
  
    var self = this;
    stream.on('end', function() {
 +    debug('wrapped end');
      if (state.decoder && !state.ended) {
        var chunk = state.decoder.end();
        if (chunk && chunk.length)
    });
  
    stream.on('data', function(chunk) {
 +    debug('wrapped data');
      if (state.decoder)
        chunk = state.decoder.write(chunk);
      if (!chunk || !state.objectMode && !chunk.length)
    // proxy all the other methods.
    // important when wrapping filters and duplexes.
    for (var i in stream) {
 -    if (typeof stream[i] === 'function' &&
 -        typeof this[i] === 'undefined') {
 +    if (util.isFunction(stream[i]) && util.isUndefined(this[i])) {
        this[i] = function(method) { return function() {
          return stream[method].apply(stream, arguments);
        }}(i);
    // when we try to consume some more bytes, simply unpause the
    // underlying stream.
    self._read = function(n) {
 +    debug('wrapped _read', n);
      if (paused) {
        paused = false;
        stream.resume();
@@@ -883,7 -910,7 +883,7 @@@ function endReadable(stream) 
    if (state.length > 0)
      throw new Error('endReadable called on non-empty stream');
  
 -  if (!state.endEmitted && state.calledRead) {
 +  if (!state.endEmitted) {
      state.ended = true;
      process.nextTick(function() {
        // Check that we didn't get one last unshift.