-2013.08.21, Version 0.10.17 (Stable)
+2013.08.21, Version 0.11.6 (Unstable)
+
+* uv: Upgrade to v0.11.8
+
+* v8: upgrade v8 to 3.20.14.1
+
+* build: disable SSLv2 by default (Ben Noordhuis)
+
+* build: don't auto-destroy existing configuration (Ben Noordhuis)
+
+* crypto: add TLS 1.1 and 1.2 to secureProtocol list (Matthias Bartelmeß)
+
+* crypto: fix memory leak in randomBytes() error path (Ben Noordhuis)
+
+* dgram: don't call into js when send cb is omitted (Ben Noordhuis)
+
+* dgram: fix regression in string argument handling (Ben Noordhuis)
+
+* domains: performance improvements (Trevor Norris)
+
+* events: EventEmitter = require('events') (Jake Verbaten)
+
+* http: Add write()/end() callbacks (isaacs)
+
+* http: Consistent 'finish' event semantics (isaacs)
+
+* http: Prefer 'binary' over 'ascii' (isaacs)
+
+* http: Support legacy agent.addRequest API (isaacs)
+
+* http: Write hex/base64 chunks properly (isaacs)
+
+* http: add agent.maxFreeSockets option (isaacs)
+
+* http: provide access to raw headers/trailers (isaacs)
+
+* http: removed headers stay removed (James Halliday)
+
+* http,timers: improve callback performance (Ben Noordhuis)
+
+* net: family option in net.connect (Vsevolod Strukchinsky)
+
+* readline: pause stdin before turning off terminal raw mode (Daniel Chatfield)
+
+* smalloc: allow different external array types (Trevor Norris)
+
+* smalloc: expose ExternalArraySize (Trevor Norris)
+
+* stream: Short-circuit buffer pushes when flowing (isaacs)
+
+* tls: handle errors on socket before releasing it (Fedor Indutny)
+
+* util: fix isPrimitive check (Trevor Norris)
+
+* util: isObject should always return boolean (Trevor Norris)
+
+
+2013.08.06, Version 0.11.5 (Unstable), 6f92da2dd106b0c63fde563284f83e08e2a521b5
+
+* v8: upgrade to 3.20.11
+
+* uv: upgrade to v0.11.7
+
+* buffer: return offset for end of last write (Trevor Norris)
+
+* build: embed the mdb_v8.so into the binary (Timothy J Fontaine)
+
+* build: fix --without-ssl build (Ben Noordhuis)
+
+* child_process: add 'shell' option to .exec() (Ben Noordhuis)
+
+* dgram: report send errors to cb, don't pass bytes (Ben Noordhuis)
+
+* fs: write strings directly to disk (Trevor Norris)
+
+* https: fix default port (Koichi Kobayashi)
+
+* openssl: use asm for sha, md5, rmd (Fedor Indutny)
+
+* os: add mac address to networkInterfaces() output (Brian White)
+
+* smalloc: introduce smalloc module (Trevor Norris)
+
+* stream: Simplify flowing, passive data listening (streams3) (isaacs)
+
+* tls: asynchronous SNICallback (Fedor Indutny)
+
+* tls: share tls tickets key between cluster workers (Fedor Indutny)
+
+* util: don't throw on circular %j input to format() (Ben Noordhuis)
+
+
+2013.07.12, Version 0.11.4 (Unstable), b5b84197ed037918fd1a26e5cb87cce7c812ca55
+
+* npm: Upgrade to 1.3.4
+
+* v8: Upgrade to v3.20.2
+
+* c-ares: Upgrade to piscisaureus/cares@805d153
+
+* timers: setImmediate process full queue each turn (Ben Noordhuis)
+
+* http: Add agent.get/request methods (isaacs)
+
+* http: Proper KeepAlive behavior (isaacs)
+
+* configure: fix the --without-ssl option (Nathan Rajlich)
+
+* buffer: propagate originating parent (Trevor Norris)
+
+* tls_wrap: return Error not throw for missing cert (Timothy J Fontaine)
+
+* src: enable native v8 typed arrays (Ben Noordhuis)
+
+* stream: objectMode transform should allow falsey values (Jeff Barczewski)
+
+* slab_allocator: remove SlabAllocator (Trevor Norris)
+
+* crypto: fix memory leak in LoadPKCS12 (Fedor Indutny)
+
+* tls: export TLSSocket (Fedor Indutny)
+
+* zlib: allow changing of level and strategy (Brian White)
+
+* zlib: allow custom flush type for flush() (Brian White)
+
+
+2013.06.26, Version 0.11.3 (Unstable), 38c0c47bbe280ddc42054418091571e532d82a1e
+
+* uv: Upgrade to v0.11.5
+
+* c-ares: upgrade to 1.10.0
+
+* v8: upgrade to v3.19.13
+
+* punycode: update to v1.2.3 (Mathias Bynens)
+
+* debugger: break on uncaught exception (Miroslav Bajtos)
+
+* child_process: emit 'disconnect' asynchronously (Ben Noordhuis)
+
+* dtrace: enable uv's probes if enabled (Timothy J Fontaine)
+
+* dtrace: unify dtrace and systemtap interfaces (Timothy J Fontaine)
+
+* buffer: New API for backing data store (Trevor Norris)
+
+* buffer: return `this` in fill() for chainability (Brian White)
+
+* build: fix include order for building on windows (Timothy J Fontaine)
+
+* build: add android support (Linus Mårtensson)
+
+* readline: strip ctrl chars for prompt width calc (Krzysztof Chrapka)
+
+* tls: introduce TLSSocket based on tls_wrap binding (Fedor Indutny)
+
+* tls: add localAddress and localPort properties (Ben Noordhuis)
+
+* crypto: free excessive memory in NodeBIO (Fedor Indutny)
+
+* process: remove maxTickDepth (Trevor Norris)
+
+* timers: use uv_now instead of Date.now (Timothy J Fontaine)
+
+* util: Add debuglog, deprecate console lookalikes (isaacs)
+
+* module: use path.sep instead of a custom solution (Robert Kowalski)
+
+* http: don't escape request path, reject bad chars (Ben Noordhuis)
+
+* net: emit dns 'lookup' event before connect (Ben Noordhuis)
+
+* dns: add getServers and setServers (Timothy J Fontaine)
+
+
+2013.05.13, Version 0.11.2 (Unstable), 5d3dc0e4c3369dfb00b7b13e08936c2e652fa696
+
+* uv: Upgrade to 0.11.2
+
+* V8: Upgrade to 3.19.0
+
+* npm: Upgrade to 1.2.21
+
+* build: Makefile should respect configure --prefix (Timothy J Fontaine)
+
+* cluster: use round-robin load balancing (Ben Noordhuis)
+
+* debugger, cluster: each worker has new debug port (Miroslav Bajtoš)
+
+* debugger: `restart` with custom debug port (Miroslav Bajtoš)
+
+* debugger: breakpoints in scripts not loaded yet (Miroslav Bajtoš)
+
+* event: EventEmitter#setMaxListeners() returns this (Sam Roberts)
+
+* events: add EventEmitter.defaultMaxListeners (Ben Noordhuis)
+
+* install: Support $(PREFIX) install target directory prefix (Olof Johansson)
+
+* os: Include netmask in os.networkInterfaces() (Ben Kelly)
+
+* path: add path.isAbsolute(path) (Ryan Doenges)
+
+* stream: Guarantee ordering of 'finish' event (isaacs)
+
+* streams: introduce .cork/.uncork/._writev (Fedor Indutny)
+
+* vm: add support for timeout argument (Andrew Paprocki)
+
+
+2013.04.19, Version 0.11.1 (Unstable), 4babd2b46ebf9fbea2c9946af5cfae25a33b2b22
+
+* V8: upgrade to 3.18.0
+
+* uv: Upgrade to v0.11.1
+
+* http: split into multiple separate modules (Timothy J Fontaine)
+
+* http: escape unsafe characters in request path (Ben Noordhuis)
+
+* url: Escape all unwise characters (isaacs)
+
+* build: depend on v8 postmortem-metadata if enabled (Paddy Byers)
+
+* etw: update prototypes to match dtrace provider (Timothy J Fontaine)
+
+* buffer: change output of Buffer.prototype.toJSON() (David Braun)
+
+* dtrace: actually use the _handle.fd value (Timothy J Fontaine)
+
+* dtrace: pass more arguments to probes (Dave Pacheco)
+
+* build: allow building with dtrace on osx (Dave Pacheco)
+
+* zlib: allow passing options to convenience methods (Kyle Robinson Young)
+
+
+2013.03.28, Version 0.11.0 (Unstable), bce38b3d74e64fcb7d04a2dd551151da6168cdc5
+
+* V8: update to 3.17.13
+
+* os: use %SystemRoot% or %windir% in os.tmpdir() (Suwon Chae)
+
+* util: fix util.inspect() line width calculation (Marcin Kostrzewa)
+
+* buffer: remove _charsWritten (Trevor Norris)
+
+* fs: uv_[fl]stat now reports subsecond resolution (Timothy J Fontaine)
+
+* fs: Throw if error raised and missing callback (bnoordhuis)
+
+* tls: expose SSL_CTX_set_timeout via tls.createServer (Manav Rathi)
+
+* tls: remove harmful unnecessary bounds checking (Marcel Laverdet)
+
+* buffer: write ascii strings using WriteOneByte (Trevor Norris)
+
+* dtrace: fix generation of v8 constants on freebsd (Fedor Indutny)
+
+* dtrace: x64 ustack helper (Fedor Indutny)
+
+* readline: handle wide characters properly (Nao Iizuka)
+
+* repl: Use a domain to catch async errors safely (isaacs)
+
+* repl: emit 'reset' event when context is reset (Sami Samhuri)
+
+* util: custom `inspect()` method may return an Object (Nathan Rajlich)
+
+* console: `console.dir()` bypasses inspect() methods (Nathan Rajlich)
+
+
++2013.08.21, Version 0.10.17 (Stable), 469a4a5091a677df62be319675056b869c31b35c
+
+ * uv: Upgrade v0.10.14
+
+ * http_parser: Do not accept PUN/GEM methods as PUT/GET (Chris Dickinson)
+
+ * tls: fix assertion when ssl is destroyed at read (Fedor Indutny)
+
+ * stream: Throw on 'error' if listeners removed (isaacs)
+
+ * dgram: fix assertion on bad send() arguments (Ben Noordhuis)
+
+ * readline: pause stdin before turning off terminal raw mode (Daniel Chatfield)
+
+
2013.08.16, Version 0.10.16 (Stable), 50b4c905a4425430ae54db4906f88982309e128d
* v8: back-port fix for CVE-2013-2882
* stream: Fix double pipe error emit (Eran Hammer)
-2013.07.25, Version 0.10.15 (Stable)
+2013.07.25, Version 0.10.15 (Stable), 2426d65af860bda7be9f0832a99601cc43c6cf63
* src: fix process.getuid() return value (Ben Noordhuis)
* net: Fix busy loop on POLLERR|POLLHUP on older linux kernels (Ben Noordhuis, isaacs)
-
2013.06.04, Version 0.10.10 (Stable), 25e51c396aa23018603baae2b1d9390f5d9db496
* uv: Upgrade to 0.10.10
* stream: Fix unshift() race conditions (isaacs)
+
+
2013.04.11, Version 0.10.4 (Stable), 9712aa9f76073c30850b20a188b1ed12ffb74d17
* uv: Upgrade to 0.10.4
* src: tie process.versions.uv to uv_version_string() (Ben Noordhuis)
-2013.03.28, Version 0.10.2 (Stable)
+2013.03.28, Version 0.10.2 (Stable), 1e0de9c426e07a260bbec2d2196c2d2db8eb8886
* npm: Upgrade to 1.2.15
var Stream = require('stream');
var util = require('util');
var StringDecoder;
+var debug = util.debuglog('stream');
util.inherits(Readable, Stream);
// the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
- this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
+ var defaultHwm = options.objectMode ? 16 : 16 * 1024;
+ this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
// cast to ints.
this.highWaterMark = ~~this.highWaterMark;
this.length = 0;
this.pipes = null;
this.pipesCount = 0;
- this.flowing = false;
+ this.flowing = null;
this.ended = false;
this.endEmitted = false;
this.reading = false;
- // In streams that never have any data, and do push(null) right away,
- // the consumer can miss the 'end' event if they do some I/O before
- // consuming the stream. So, we don't emit('end') until some reading
- // happens.
- this.calledRead = false;
-
// a flag to be able to tell if the onwrite cb is called immediately,
- // or on a later tick. We set this to true at first, becuase any
+ // or on a later tick. We set this to true at first, because any
// actions that shouldn't happen until "later" should generally also
// not happen before the first write call.
this.sync = true;
Readable.prototype.push = function(chunk, encoding) {
var state = this._readableState;
- if (typeof chunk === 'string' && !state.objectMode) {
+ if (util.isString(chunk) && !state.objectMode) {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
chunk = new Buffer(chunk, encoding);
var er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
- } else if (chunk === null || chunk === undefined) {
+ } else if (util.isNullOrUndefined(chunk)) {
state.reading = false;
if (!state.ended)
onEofChunk(stream, state);
if (state.decoder && !addToFront && !encoding)
chunk = state.decoder.write(chunk);
- // update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront) {
- state.buffer.unshift(chunk);
- } else {
+ if (!addToFront)
state.reading = false;
- state.buffer.push(chunk);
- }
- if (state.needReadable)
- emitReadable(stream);
+ // if we want the data now, just emit it.
+ if (state.flowing && state.length === 0 && !state.sync) {
+ stream.emit('data', chunk);
+ stream.read(0);
+ } else {
+ // update the buffer info.
+ state.length += state.objectMode ? 1 : chunk.length;
+ if (addToFront)
+ state.buffer.unshift(chunk);
+ else
+ state.buffer.push(chunk);
+
+ if (state.needReadable)
+ emitReadable(stream);
+ }
maybeReadMore(stream, state);
}
if (state.objectMode)
return n === 0 ? 0 : 1;
- if (isNaN(n) || n === null) {
+ if (isNaN(n) || util.isNull(n)) {
// only flow one buffer at a time
if (state.flowing && state.buffer.length)
return state.buffer[0].length;
// you can override either this method, or the async _read(n) below.
Readable.prototype.read = function(n) {
+ debug('read', n);
var state = this._readableState;
- state.calledRead = true;
var nOrig = n;
- if (typeof n !== 'number' || n > 0)
+ if (!util.isNumber(n) || n > 0)
state.emittedReadable = false;
// if we're doing read(0) to trigger a readable event, but we
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
- emitReadable(this);
+ debug('read: emitReadable', state.length, state.ended);
+ if (state.length === 0 && state.ended)
+ endReadable(this);
+ else
+ emitReadable(this);
return null;
}
// if we need a readable event, then we need to do some reading.
var doRead = state.needReadable;
+ debug('need readable', doRead);
// if we currently have less than the highWaterMark, then also read some
- if (state.length - n <= state.highWaterMark)
+ if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
+ debug('length less than watermark', doRead);
+ }
// however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
- if (state.ended || state.reading)
+ if (state.ended || state.reading) {
doRead = false;
+ debug('reading or ended', doRead);
+ }
if (doRead) {
+ debug('do read');
state.reading = true;
state.sync = true;
// if the length is currently zero, then we *need* a readable event.
state.sync = false;
}
- // If _read called its callback synchronously, then `reading`
- // will be false, and we need to re-evaluate how much data we
- // can return to the user.
+ // If _read pushed data synchronously, then `reading` will be false,
+ // and we need to re-evaluate how much data we can return to the user.
if (doRead && !state.reading)
n = howMuchToRead(nOrig, state);
else
ret = null;
- if (ret === null) {
+ if (util.isNull(ret)) {
state.needReadable = true;
n = 0;
}
if (state.length === 0 && !state.ended)
state.needReadable = true;
- // If we happened to read() exactly the remaining amount in the
- // buffer, and the EOF has been seen at this point, then make sure
- // that we emit 'end' on the very next tick.
- if (state.ended && !state.endEmitted && state.length === 0)
+ // If we tried to read() past the EOF, then emit end on the next tick.
+ if (nOrig !== n && state.ended && state.length === 0)
endReadable(this);
+ if (!util.isNull(ret))
+ this.emit('data', ret);
+
return ret;
};
function chunkInvalid(state, chunk) {
var er = null;
- if (!Buffer.isBuffer(chunk) &&
- 'string' !== typeof chunk &&
- chunk !== null &&
- chunk !== undefined &&
+ if (!util.isBuffer(chunk) &&
+ !util.isString(chunk) &&
+ !util.isNullOrUndefined(chunk) &&
!state.objectMode &&
!er) {
er = new TypeError('Invalid non-string/buffer chunk');
}
state.ended = true;
- // if we've ended and we have some data left, then emit
- // 'readable' now to make sure it gets picked up.
- if (state.length > 0)
- emitReadable(stream);
- else
- endReadable(stream);
+ // emit 'readable' now to make sure it gets picked up.
+ emitReadable(stream);
}
// Don't emit readable right away in sync mode, because this can trigger
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
- if (state.emittedReadable)
- return;
-
- state.emittedReadable = true;
- if (state.sync)
- process.nextTick(function() {
+ if (!state.emittedReadable) {
+ debug('emitReadable', state.flowing);
+ state.emittedReadable = true;
+ if (state.sync)
+ process.nextTick(function() {
+ emitReadable_(stream);
+ });
+ else
emitReadable_(stream);
- });
- else
- emitReadable_(stream);
+ }
}
function emitReadable_(stream) {
+ debug('emit readable');
stream.emit('readable');
+ flow(stream);
}
var len = state.length;
while (!state.reading && !state.flowing && !state.ended &&
state.length < state.highWaterMark) {
+ debug('maybeReadMore read 0');
stream.read(0);
if (len === state.length)
// didn't get any data, stop spinning.
break;
}
state.pipesCount += 1;
+ debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
dest.on('unpipe', onunpipe);
function onunpipe(readable) {
- if (readable !== src) return;
- cleanup();
+ debug('onunpipe');
+ if (readable === src) {
+ cleanup();
+ }
}
function onend() {
+ debug('onend');
dest.end();
}
dest.on('drain', ondrain);
function cleanup() {
+ debug('cleanup');
// cleanup event handlers once the pipe is broken
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
src.removeListener('end', cleanup);
+ src.removeListener('data', ondata);
// if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
- if (!dest._writableState || dest._writableState.needDrain)
+ if (state.awaitDrain &&
+ (!dest._writableState || dest._writableState.needDrain))
ondrain();
}
+ src.on('data', ondata);
+ function ondata(chunk) {
+ debug('ondata');
+ var ret = dest.write(chunk);
+ if (false === ret) {
+ debug('false write response, pause',
+ src._readableState.awaitDrain);
+ src._readableState.awaitDrain++;
+ src.pause();
+ }
+ }
+
// if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this.
function onerror(er) {
+ debug('onerror', er);
unpipe();
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0)
}
// This is a brutally ugly hack to make sure that our error handler
// is attached before any userland ones. NEVER DO THIS.
- if (!dest._events.error)
+ if (!dest._events || !dest._events.error)
dest.on('error', onerror);
else if (Array.isArray(dest._events.error))
dest._events.error.unshift(onerror);
}
dest.once('close', onclose);
function onfinish() {
+ debug('onfinish');
dest.removeListener('close', onclose);
unpipe();
}
dest.once('finish', onfinish);
function unpipe() {
+ debug('unpipe');
src.unpipe(dest);
}
// start the flow if it hasn't been started already.
if (!state.flowing) {
- // the handler that waits for readable events after all
- // the data gets sucked out in flow.
- // This would be easier to follow with a .once() handler
- // in flow(), but that is too slow.
- this.on('readable', pipeOnReadable);
-
- state.flowing = true;
- process.nextTick(function() {
- flow(src);
- });
+ debug('pipe resume');
+ src.resume();
}
return dest;
function pipeOnDrain(src) {
return function() {
- var dest = this;
var state = src._readableState;
- state.awaitDrain--;
- if (state.awaitDrain === 0)
+ debug('pipeOnDrain', state.awaitDrain);
+ if (state.awaitDrain)
+ state.awaitDrain--;
+ if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
+ state.flowing = true;
flow(src);
- };
-}
-
-function flow(src) {
- var state = src._readableState;
- var chunk;
- state.awaitDrain = 0;
-
- function write(dest, i, list) {
- var written = dest.write(chunk);
- if (false === written) {
- state.awaitDrain++;
}
- }
-
- while (state.pipesCount && null !== (chunk = src.read())) {
-
- if (state.pipesCount === 1)
- write(state.pipes, 0, null);
- else
- state.pipes.forEach(write);
-
- src.emit('data', chunk);
-
- // if anyone needs a drain, then we have to wait for that.
- if (state.awaitDrain > 0)
- return;
- }
-
- // if every destination was unpiped, either before entering this
- // function, or in the while loop, then stop flowing.
- //
- // NB: This is a pretty rare edge case.
- if (state.pipesCount === 0) {
- state.flowing = false;
-
- // if there were data event listeners added, then switch to old mode.
- if (EE.listenerCount(src, 'data') > 0)
- emitDataEvents(src);
- return;
- }
-
- // at this point, no one needed a drain, so we just ran out of data
- // on the next readable event, start it over again.
- state.ranOut = true;
-}
-
-function pipeOnReadable() {
- if (this._readableState.ranOut) {
- this._readableState.ranOut = false;
- flow(this);
- }
+ };
}
// got a match.
state.pipes = null;
state.pipesCount = 0;
- this.removeListener('readable', pipeOnReadable);
state.flowing = false;
if (dest)
dest.emit('unpipe', this);
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
- this.removeListener('readable', pipeOnReadable);
state.flowing = false;
for (var i = 0; i < len; i++)
Readable.prototype.on = function(ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn);
- if (ev === 'data' && !this._readableState.flowing)
- emitDataEvents(this);
+ // If listening to data, and it has not explicitly been paused,
+ // then call resume to start the flow of data on the next tick.
+ if (ev === 'data' && false !== this._readableState.flowing) {
+ this.resume();
+ }
if (ev === 'readable' && this.readable) {
var state = this._readableState;
state.emittedReadable = false;
state.needReadable = true;
if (!state.reading) {
- this.read(0);
+ var self = this;
+ process.nextTick(function() {
+ debug('readable nexttick read 0');
+ self.read(0);
+ });
} else if (state.length) {
emitReadable(this, state);
}
// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
- emitDataEvents(this);
- this.read(0);
- this.emit('resume');
+ var state = this._readableState;
+ if (!state.flowing) {
+ debug('resume');
+ state.flowing = true;
+ if (!state.reading) {
+ debug('resume read 0');
+ this.read(0);
+ }
+ resume(this, state);
+ }
};
+function resume(stream, state) {
+ if (!state.resumeScheduled) {
+ state.resumeScheduled = true;
+ process.nextTick(function() {
+ resume_(stream, state);
+ });
+ }
+}
+
+function resume_(stream, state) {
+ state.resumeScheduled = false;
+ stream.emit('resume');
+ flow(stream);
+ if (state.flowing && !state.reading)
+ stream.read(0);
+}
+
Readable.prototype.pause = function() {
- emitDataEvents(this, true);
- this.emit('pause');
+ debug('call pause flowing=%j', this._readableState.flowing);
+ if (false !== this._readableState.flowing) {
+ debug('pause');
+ this._readableState.flowing = false;
+ this.emit('pause');
+ }
};
-function emitDataEvents(stream, startPaused) {
+function flow(stream) {
var state = stream._readableState;
-
+ debug('flow', state.flowing);
if (state.flowing) {
- // https://github.com/isaacs/readable-stream/issues/16
- throw new Error('Cannot switch to old mode now.');
+ do {
+ var chunk = stream.read();
+ } while (null !== chunk && state.flowing);
}
-
- var paused = startPaused || false;
- var readable = false;
-
- // convert to an old-style stream.
- stream.readable = true;
- stream.pipe = Stream.prototype.pipe;
- stream.on = stream.addListener = Stream.prototype.on;
-
- stream.on('readable', function() {
- readable = true;
-
- var c;
- while (!paused && (null !== (c = stream.read())))
- stream.emit('data', c);
-
- if (c === null) {
- readable = false;
- stream._readableState.needReadable = true;
- }
- });
-
- stream.pause = function() {
- paused = true;
- this.emit('pause');
- };
-
- stream.resume = function() {
- paused = false;
- if (readable)
- process.nextTick(function() {
- stream.emit('readable');
- });
- else
- this.read(0);
- this.emit('resume');
- };
-
- // now make it start, just in case it hadn't already.
- stream.emit('readable');
}
// wrap an old-style stream as the async data source.
var self = this;
stream.on('end', function() {
+ debug('wrapped end');
if (state.decoder && !state.ended) {
var chunk = state.decoder.end();
if (chunk && chunk.length)
});
stream.on('data', function(chunk) {
+ debug('wrapped data');
if (state.decoder)
chunk = state.decoder.write(chunk);
if (!chunk || !state.objectMode && !chunk.length)
// proxy all the other methods.
// important when wrapping filters and duplexes.
for (var i in stream) {
- if (typeof stream[i] === 'function' &&
- typeof this[i] === 'undefined') {
+ if (util.isFunction(stream[i]) && util.isUndefined(this[i])) {
this[i] = function(method) { return function() {
return stream[method].apply(stream, arguments);
}}(i);
// when we try to consume some more bytes, simply unpause the
// underlying stream.
self._read = function(n) {
+ debug('wrapped _read', n);
if (paused) {
paused = false;
stream.resume();
if (state.length > 0)
throw new Error('endReadable called on non-empty stream');
- if (!state.endEmitted && state.calledRead) {
+ if (!state.endEmitted) {
state.ended = true;
process.nextTick(function() {
// Check that we didn't get one last unshift.