From 286aa04910f355699be2a99f89570fd6344e8d39 Mon Sep 17 00:00:00 2001 From: isaacs Date: Wed, 31 Oct 2012 14:30:30 -0700 Subject: [PATCH] streams2: Abstract out onread function --- lib/_stream_readable.js | 114 ++++++++++++++++++++++++++---------------------- 1 file changed, 61 insertions(+), 53 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 51201e8..4e84eb5 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -28,7 +28,7 @@ var StringDecoder; util.inherits(Readable, Stream); -function ReadableState(options) { +function ReadableState(options, stream) { options = options || {}; this.bufferSize = options.bufferSize || 16 * 1024; @@ -45,6 +45,8 @@ function ReadableState(options) { this.ended = false; this.endEmitted = false; this.reading = false; + this.sync = false; + this.onread = onread.bind(stream); // whenever we return null, then we set a flag to say // that we're awaiting a 'readable' event emission. @@ -144,59 +146,10 @@ Readable.prototype.read = function(n) { if (doRead) { var sync = true; state.reading = true; + state.sync = true; // call internal read method - this._read(state.bufferSize, function onread(er, chunk) { - state.reading = false; - if (er) - return this.emit('error', er); - - if (!chunk || !chunk.length) { - // eof - state.ended = true; - if (state.decoder) { - chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += chunk.length; - } - } - // if we've ended and we have some data left, then emit - // 'readable' now to make sure it gets picked up. - if (!sync) { - if (state.length > 0) - this.emit('readable'); - else - endReadable(this); - } - return; - } - - if (state.decoder) - chunk = state.decoder.write(chunk); - - // update the buffer info. - if (chunk) { - state.length += chunk.length; - state.buffer.push(chunk); - } - - // if we haven't gotten enough to pass the lowWaterMark, - // and we haven't ended, then don't bother telling the user - // that it's time to read more data. Otherwise, that'll - // probably kick off another stream.read(), which can trigger - // another _read(n,cb) before this one returns! - if (state.length <= state.lowWaterMark) { - state.reading = true; - this._read(state.bufferSize, onread.bind(this)); - return; - } - - if (state.needReadable && !sync) { - state.needReadable = false; - this.emit('readable'); - } - }.bind(this)); - sync = false; + this._read(state.bufferSize, state.onread); + state.sync = false; } // If _read called its callback synchronously, then `reading` @@ -221,6 +174,61 @@ Readable.prototype.read = function(n) { return ret; }; +function onread(er, chunk) { + var state = this._readableState; + var sync = state.sync; + + state.reading = false; + if (er) + return this.emit('error', er); + + if (!chunk || !chunk.length) { + // eof + state.ended = true; + if (state.decoder) { + chunk = state.decoder.end(); + if (chunk && chunk.length) { + state.buffer.push(chunk); + state.length += chunk.length; + } + } + // if we've ended and we have some data left, then emit + // 'readable' now to make sure it gets picked up. + if (!sync) { + if (state.length > 0) + this.emit('readable'); + else + endReadable(this); + } + return; + } + + if (state.decoder) + chunk = state.decoder.write(chunk); + + // update the buffer info. + if (chunk) { + state.length += chunk.length; + state.buffer.push(chunk); + } + + // if we haven't gotten enough to pass the lowWaterMark, + // and we haven't ended, then don't bother telling the user + // that it's time to read more data. Otherwise, that'll + // probably kick off another stream.read(), which can trigger + // another _read(n,cb) before this one returns! + if (state.length <= state.lowWaterMark) { + state.reading = true; + this._read(state.bufferSize, state.onread); + return; + } + + if (state.needReadable && !sync) { + state.needReadable = false; + this.emit('readable'); + } +} + // abstract method. to be overridden in specific implementation classes. // call cb(er, data) where data is <= n in length. // for virtual (non-string, non-buffer) streams, "length" is somewhat -- 2.7.4