3 module.exports = Readable;
4 Readable.ReadableState = ReadableState;
6 const EE = require('events');
7 const Stream = require('stream');
8 const Buffer = require('buffer').Buffer;
9 const util = require('util');
10 const debug = util.debuglog('stream');
13 util.inherits(Readable, Stream);
15 function ReadableState(options, stream) {
16 options = options || {};
18 // object stream flag. Used to make read(n) ignore n and to
19 // make all the buffer merging and length checks go away
20 this.objectMode = !!options.objectMode;
22 if (stream instanceof Stream.Duplex)
23 this.objectMode = this.objectMode || !!options.readableObjectMode;
25 // the point at which it stops calling _read() to fill the buffer
26 // Note: 0 is a valid value, means "don't call _read preemptively ever"
27 var hwm = options.highWaterMark;
28 var defaultHwm = this.objectMode ? 16 : 16 * 1024;
29 this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
32 this.highWaterMark = ~~this.highWaterMark;
40 this.endEmitted = false;
43 // a flag to be able to tell if the onwrite cb is called immediately,
44 // or on a later tick. We set this to true at first, because any
45 // actions that shouldn't happen until "later" should generally also
46 // not happen before the first write call.
49 // whenever we return null, then we set a flag to say
50 // that we're awaiting a 'readable' event emission.
51 this.needReadable = false;
52 this.emittedReadable = false;
53 this.readableListening = false;
55 // Crypto is kind of old and crusty. Historically, its default string
56 // encoding is 'binary' so we have to make this configurable.
57 // Everything else in the universe uses 'utf8', though.
58 this.defaultEncoding = options.defaultEncoding || 'utf8';
60 // when piping, we only care about 'readable' events that happen
61 // after read()ing all the bytes and not getting any pushback.
64 // the number of writers that are awaiting a drain event in .pipe()s
67 // if true, a maybeReadMore has been scheduled
68 this.readingMore = false;
72 if (options.encoding) {
74 StringDecoder = require('string_decoder').StringDecoder;
75 this.decoder = new StringDecoder(options.encoding);
76 this.encoding = options.encoding;
80 function Readable(options) {
81 if (!(this instanceof Readable))
82 return new Readable(options);
84 this._readableState = new ReadableState(options, this);
89 if (options && typeof options.read === 'function')
90 this._read = options.read;
95 // Manually shove something into the read() buffer.
96 // This returns true if the highWaterMark has not been hit yet,
97 // similar to how Writable.write() returns true if you should
99 Readable.prototype.push = function(chunk, encoding) {
100 var state = this._readableState;
102 if (!state.objectMode && typeof chunk === 'string') {
103 encoding = encoding || state.defaultEncoding;
104 if (encoding !== state.encoding) {
105 chunk = new Buffer(chunk, encoding);
110 return readableAddChunk(this, state, chunk, encoding, false);
113 // Unshift should *always* be something directly out of read()
114 Readable.prototype.unshift = function(chunk) {
115 var state = this._readableState;
116 return readableAddChunk(this, state, chunk, '', true);
119 Readable.prototype.isPaused = function() {
120 return this._readableState.flowing === false;
123 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
124 var er = chunkInvalid(state, chunk);
126 stream.emit('error', er);
127 } else if (chunk === null) {
128 state.reading = false;
129 onEofChunk(stream, state);
130 } else if (state.objectMode || chunk && chunk.length > 0) {
131 if (state.ended && !addToFront) {
132 var e = new Error('stream.push() after EOF');
133 stream.emit('error', e);
134 } else if (state.endEmitted && addToFront) {
135 var e = new Error('stream.unshift() after end event');
136 stream.emit('error', e);
138 if (state.decoder && !addToFront && !encoding)
139 chunk = state.decoder.write(chunk);
142 state.reading = false;
144 // if we want the data now, just emit it.
145 if (state.flowing && state.length === 0 && !state.sync) {
146 stream.emit('data', chunk);
149 // update the buffer info.
150 state.length += state.objectMode ? 1 : chunk.length;
152 state.buffer.unshift(chunk);
154 state.buffer.push(chunk);
156 if (state.needReadable)
157 emitReadable(stream);
160 maybeReadMore(stream, state);
162 } else if (!addToFront) {
163 state.reading = false;
166 return needMoreData(state);
170 // if it's past the high water mark, we can push in some more.
171 // Also, if we have no data yet, we can stand some
172 // more bytes. This is to work around cases where hwm=0,
173 // such as the repl. Also, if the push() triggered a
174 // readable event, and the user called read(largeNumber) such that
175 // needReadable was set, then we ought to push more, so that another
176 // 'readable' event will be triggered.
177 function needMoreData(state) {
178 return !state.ended &&
179 (state.needReadable ||
180 state.length < state.highWaterMark ||
184 // backwards compatibility.
185 Readable.prototype.setEncoding = function(enc) {
187 StringDecoder = require('string_decoder').StringDecoder;
188 this._readableState.decoder = new StringDecoder(enc);
189 this._readableState.encoding = enc;
193 // Don't raise the hwm > 8MB
194 const MAX_HWM = 0x800000;
195 function computeNewHighWaterMark(n) {
199 // Get the next highest power of 2
211 function howMuchToRead(n, state) {
212 if (state.length === 0 && state.ended)
215 if (state.objectMode)
216 return n === 0 ? 0 : 1;
218 if (n === null || isNaN(n)) {
219 // only flow one buffer at a time
220 if (state.flowing && state.buffer.length)
221 return state.buffer[0].length;
229 // If we're asking for more than the target buffer level,
230 // then raise the water mark. Bump up to the next highest
231 // power of 2, to prevent increasing it excessively in tiny
233 if (n > state.highWaterMark)
234 state.highWaterMark = computeNewHighWaterMark(n);
236 // don't have that much. return null, unless we've ended.
237 if (n > state.length) {
239 state.needReadable = true;
249 // you can override either this method, or the async _read(n) below.
250 Readable.prototype.read = function(n) {
252 var state = this._readableState;
255 if (typeof n !== 'number' || n > 0)
256 state.emittedReadable = false;
258 // if we're doing read(0) to trigger a readable event, but we
259 // already have a bunch of data in the buffer, then just trigger
260 // the 'readable' event and move on.
262 state.needReadable &&
263 (state.length >= state.highWaterMark || state.ended)) {
264 debug('read: emitReadable', state.length, state.ended);
265 if (state.length === 0 && state.ended)
272 n = howMuchToRead(n, state);
274 // if we've ended, and we're now clear, then finish it up.
275 if (n === 0 && state.ended) {
276 if (state.length === 0)
281 // All the actual chunk generation logic needs to be
282 // *below* the call to _read. The reason is that in certain
283 // synthetic stream cases, such as passthrough streams, _read
284 // may be a completely synchronous operation which may change
285 // the state of the read buffer, providing enough data when
286 // before there was *not* enough.
288 // So, the steps are:
289 // 1. Figure out what the state of things will be after we do
290 // a read from the buffer.
292 // 2. If that resulting state will trigger a _read, then call _read.
293 // Note that this may be asynchronous, or synchronous. Yes, it is
294 // deeply ugly to write APIs this way, but that still doesn't mean
295 // that the Readable class should behave improperly, as streams are
296 // designed to be sync/async agnostic.
297 // Take note if the _read call is sync or async (ie, if the read call
298 // has returned yet), so that we know whether or not it's safe to emit
301 // 3. Actually pull the requested chunks out of the buffer and return.
303 // if we need a readable event, then we need to do some reading.
304 var doRead = state.needReadable;
305 debug('need readable', doRead);
307 // if we currently have less than the highWaterMark, then also read some
308 if (state.length === 0 || state.length - n < state.highWaterMark) {
310 debug('length less than watermark', doRead);
313 // however, if we've ended, then there's no point, and if we're already
314 // reading, then it's unnecessary.
315 if (state.ended || state.reading) {
317 debug('reading or ended', doRead);
322 state.reading = true;
324 // if the length is currently zero, then we *need* a readable event.
325 if (state.length === 0)
326 state.needReadable = true;
327 // call internal read method
328 this._read(state.highWaterMark);
332 // If _read pushed data synchronously, then `reading` will be false,
333 // and we need to re-evaluate how much data we can return to the user.
334 if (doRead && !state.reading)
335 n = howMuchToRead(nOrig, state);
339 ret = fromList(n, state);
344 state.needReadable = true;
350 // If we have nothing in the buffer, then we want to know
351 // as soon as we *do* get something into the buffer.
352 if (state.length === 0 && !state.ended)
353 state.needReadable = true;
355 // If we tried to read() past the EOF, then emit end on the next tick.
356 if (nOrig !== n && state.ended && state.length === 0)
360 this.emit('data', ret);
365 function chunkInvalid(state, chunk) {
367 if (!(chunk instanceof Buffer) &&
368 typeof chunk !== 'string' &&
370 chunk !== undefined &&
372 er = new TypeError('Invalid non-string/buffer chunk');
378 function onEofChunk(stream, state) {
379 if (state.ended) return;
381 var chunk = state.decoder.end();
382 if (chunk && chunk.length) {
383 state.buffer.push(chunk);
384 state.length += state.objectMode ? 1 : chunk.length;
389 // emit 'readable' now to make sure it gets picked up.
390 emitReadable(stream);
393 // Don't emit readable right away in sync mode, because this can trigger
394 // another read() call => stack overflow. This way, it might trigger
395 // a nextTick recursion warning, but that's not so bad.
396 function emitReadable(stream) {
397 var state = stream._readableState;
398 state.needReadable = false;
399 if (!state.emittedReadable) {
400 debug('emitReadable', state.flowing);
401 state.emittedReadable = true;
403 process.nextTick(emitReadable_, stream);
405 emitReadable_(stream);
409 function emitReadable_(stream) {
410 debug('emit readable');
411 stream.emit('readable');
416 // at this point, the user has presumably seen the 'readable' event,
417 // and called read() to consume some data. that may have triggered
418 // in turn another _read(n) call, in which case reading = true if
420 // However, if we're not ended, or reading, and the length < hwm,
421 // then go ahead and try to read some more preemptively.
422 function maybeReadMore(stream, state) {
423 if (!state.readingMore) {
424 state.readingMore = true;
425 process.nextTick(maybeReadMore_, stream, state);
429 function maybeReadMore_(stream, state) {
430 var len = state.length;
431 while (!state.reading && !state.flowing && !state.ended &&
432 state.length < state.highWaterMark) {
433 debug('maybeReadMore read 0');
435 if (len === state.length)
436 // didn't get any data, stop spinning.
441 state.readingMore = false;
444 // abstract method. to be overridden in specific implementation classes.
445 // call cb(er, data) where data is <= n in length.
446 // for virtual (non-string, non-buffer) streams, "length" is somewhat
447 // arbitrary, and perhaps not very meaningful.
448 Readable.prototype._read = function(n) {
449 this.emit('error', new Error('not implemented'));
452 Readable.prototype.pipe = function(dest, pipeOpts) {
454 var state = this._readableState;
456 switch (state.pipesCount) {
461 state.pipes = [state.pipes, dest];
464 state.pipes.push(dest);
467 state.pipesCount += 1;
468 debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
470 var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
471 dest !== process.stdout &&
472 dest !== process.stderr;
474 var endFn = doEnd ? onend : cleanup;
475 if (state.endEmitted)
476 process.nextTick(endFn);
478 src.once('end', endFn);
480 dest.on('unpipe', onunpipe);
481 function onunpipe(readable) {
483 if (readable === src) {
493 // when the dest drains, it reduces the awaitDrain counter
494 // on the source. This would be more elegant with a .once()
495 // handler in flow(), but adding and removing repeatedly is
497 var ondrain = pipeOnDrain(src);
498 dest.on('drain', ondrain);
500 var cleanedUp = false;
503 // cleanup event handlers once the pipe is broken
504 dest.removeListener('close', onclose);
505 dest.removeListener('finish', onfinish);
506 dest.removeListener('drain', ondrain);
507 dest.removeListener('error', onerror);
508 dest.removeListener('unpipe', onunpipe);
509 src.removeListener('end', onend);
510 src.removeListener('end', cleanup);
511 src.removeListener('data', ondata);
515 // if the reader is waiting for a drain event from this
516 // specific writer, then it would cause it to never start
518 // So, if this is awaiting a drain, then we just call it now.
519 // If we don't know, then assume that we are waiting for one.
520 if (state.awaitDrain &&
521 (!dest._writableState || dest._writableState.needDrain))
525 src.on('data', ondata);
526 function ondata(chunk) {
528 var ret = dest.write(chunk);
530 // If the user unpiped during `dest.write()`, it is possible
531 // to get stuck in a permanently paused state if that write
532 // also returned false.
533 if (state.pipesCount === 1 &&
534 state.pipes[0] === dest &&
535 src.listenerCount('data') === 1 &&
537 debug('false write response, pause', src._readableState.awaitDrain);
538 src._readableState.awaitDrain++;
544 // if the dest has an error, then stop piping into it.
545 // however, don't suppress the throwing behavior for this.
546 function onerror(er) {
547 debug('onerror', er);
549 dest.removeListener('error', onerror);
550 if (EE.listenerCount(dest, 'error') === 0)
551 dest.emit('error', er);
553 // This is a brutally ugly hack to make sure that our error handler
554 // is attached before any userland ones. NEVER DO THIS.
555 if (!dest._events || !dest._events.error)
556 dest.on('error', onerror);
557 else if (Array.isArray(dest._events.error))
558 dest._events.error.unshift(onerror);
560 dest._events.error = [onerror, dest._events.error];
563 // Both close and finish should trigger unpipe, but only once.
565 dest.removeListener('finish', onfinish);
568 dest.once('close', onclose);
569 function onfinish() {
571 dest.removeListener('close', onclose);
574 dest.once('finish', onfinish);
581 // tell the dest that it's being piped to
582 dest.emit('pipe', src);
584 // start the flow if it hasn't been started already.
585 if (!state.flowing) {
586 debug('pipe resume');
593 function pipeOnDrain(src) {
595 var state = src._readableState;
596 debug('pipeOnDrain', state.awaitDrain);
597 if (state.awaitDrain)
599 if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
600 state.flowing = true;
607 Readable.prototype.unpipe = function(dest) {
608 var state = this._readableState;
610 // if we're not piping anywhere, then do nothing.
611 if (state.pipesCount === 0)
614 // just one destination. most common case.
615 if (state.pipesCount === 1) {
616 // passed in one, but it's not the right one.
617 if (dest && dest !== state.pipes)
625 state.pipesCount = 0;
626 state.flowing = false;
628 dest.emit('unpipe', this);
632 // slow case. multiple pipe destinations.
636 var dests = state.pipes;
637 var len = state.pipesCount;
639 state.pipesCount = 0;
640 state.flowing = false;
642 for (var i = 0; i < len; i++)
643 dests[i].emit('unpipe', this);
647 // try to find the right one.
648 var i = state.pipes.indexOf(dest);
652 state.pipes.splice(i, 1);
653 state.pipesCount -= 1;
654 if (state.pipesCount === 1)
655 state.pipes = state.pipes[0];
657 dest.emit('unpipe', this);
662 // set up data events if they are asked for
663 // Ensure readable listeners eventually get something
664 Readable.prototype.on = function(ev, fn) {
665 var res = Stream.prototype.on.call(this, ev, fn);
667 // If listening to data, and it has not explicitly been paused,
668 // then call resume to start the flow of data on the next tick.
669 if (ev === 'data' && false !== this._readableState.flowing) {
673 if (ev === 'readable' && this.readable) {
674 var state = this._readableState;
675 if (!state.readableListening) {
676 state.readableListening = true;
677 state.emittedReadable = false;
678 state.needReadable = true;
679 if (!state.reading) {
680 process.nextTick(nReadingNextTick, this);
681 } else if (state.length) {
682 emitReadable(this, state);
689 Readable.prototype.addListener = Readable.prototype.on;
691 function nReadingNextTick(self) {
692 debug('readable nexttick read 0');
696 // pause() and resume() are remnants of the legacy readable stream API
697 // If the user uses them, then switch into old mode.
698 Readable.prototype.resume = function() {
699 var state = this._readableState;
700 if (!state.flowing) {
702 state.flowing = true;
708 function resume(stream, state) {
709 if (!state.resumeScheduled) {
710 state.resumeScheduled = true;
711 process.nextTick(resume_, stream, state);
715 function resume_(stream, state) {
716 if (!state.reading) {
717 debug('resume read 0');
721 state.resumeScheduled = false;
722 stream.emit('resume');
724 if (state.flowing && !state.reading)
728 Readable.prototype.pause = function() {
729 debug('call pause flowing=%j', this._readableState.flowing);
730 if (false !== this._readableState.flowing) {
732 this._readableState.flowing = false;
738 function flow(stream) {
739 var state = stream._readableState;
740 debug('flow', state.flowing);
743 var chunk = stream.read();
744 } while (null !== chunk && state.flowing);
748 // wrap an old-style stream as the async data source.
749 // This is *not* part of the readable stream interface.
750 // It is an ugly unfortunate mess of history.
751 Readable.prototype.wrap = function(stream) {
752 var state = this._readableState;
756 stream.on('end', function() {
757 debug('wrapped end');
758 if (state.decoder && !state.ended) {
759 var chunk = state.decoder.end();
760 if (chunk && chunk.length)
767 stream.on('data', function(chunk) {
768 debug('wrapped data');
770 chunk = state.decoder.write(chunk);
772 // don't skip over falsy values in objectMode
773 if (state.objectMode && (chunk === null || chunk === undefined))
775 else if (!state.objectMode && (!chunk || !chunk.length))
778 var ret = self.push(chunk);
785 // proxy all the other methods.
786 // important when wrapping filters and duplexes.
787 for (var i in stream) {
788 if (this[i] === undefined && typeof stream[i] === 'function') {
789 this[i] = function(method) { return function() {
790 return stream[method].apply(stream, arguments);
795 // proxy certain important events.
796 const events = ['error', 'close', 'destroy', 'pause', 'resume'];
797 events.forEach(function(ev) {
798 stream.on(ev, self.emit.bind(self, ev));
801 // when we try to consume some more bytes, simply unpause the
802 // underlying stream.
803 self._read = function(n) {
804 debug('wrapped _read', n);
815 // exposed for testing purposes only.
816 Readable._fromList = fromList;
818 // Pluck off n bytes from an array of buffers.
819 // Length is the combined lengths of all the buffers in the list.
820 function fromList(n, state) {
821 var list = state.buffer;
822 var length = state.length;
823 var stringMode = !!state.decoder;
824 var objectMode = !!state.objectMode;
827 // nothing in the list, definitely empty.
828 if (list.length === 0)
835 else if (!n || n >= length) {
836 // read it all, truncate the array.
839 else if (list.length === 1)
842 ret = Buffer.concat(list, length);
845 // read just some of it.
846 if (n < list[0].length) {
847 // just take a part of the first list item.
848 // slice is the same for buffers and strings.
850 ret = buf.slice(0, n);
851 list[0] = buf.slice(n);
852 } else if (n === list[0].length) {
853 // first list is a perfect match
857 // we have enough to cover it, but it spans past the first buffer.
864 for (var i = 0, l = list.length; i < l && c < n; i++) {
866 var cpy = Math.min(n - c, buf.length);
869 ret += buf.slice(0, cpy);
871 buf.copy(ret, c, 0, cpy);
873 if (cpy < buf.length)
874 list[0] = buf.slice(cpy);
886 function endReadable(stream) {
887 var state = stream._readableState;
889 // If we get here before consuming all the bytes, then that is a
890 // bug in node. Should never happen.
891 if (state.length > 0)
892 throw new Error('endReadable called on non-empty stream');
894 if (!state.endEmitted) {
896 process.nextTick(endReadableNT, state, stream);
900 function endReadableNT(state, stream) {
901 // Check that we didn't get one last unshift.
902 if (!state.endEmitted && state.length === 0) {
903 state.endEmitted = true;
904 stream.readable = false;