stream: check _events before _events.error
[platform/upstream/nodejs.git] / lib / _stream_readable.js
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 module.exports = Readable;
23 Readable.ReadableState = ReadableState;
24
25 var EE = require('events').EventEmitter;
26 var Stream = require('stream');
27 var util = require('util');
28 var StringDecoder;
29
30 util.inherits(Readable, Stream);
31
32 function ReadableState(options, stream) {
33   options = options || {};
34
35   // the point at which it stops calling _read() to fill the buffer
36   // Note: 0 is a valid value, means "don't call _read preemptively ever"
37   var hwm = options.highWaterMark;
38   this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
39
40   // cast to ints.
41   this.highWaterMark = ~~this.highWaterMark;
42
43   this.buffer = [];
44   this.length = 0;
45   this.pipes = null;
46   this.pipesCount = 0;
47   this.flowing = false;
48   this.ended = false;
49   this.endEmitted = false;
50   this.reading = false;
51
52   // In streams that never have any data, and do push(null) right away,
53   // the consumer can miss the 'end' event if they do some I/O before
54   // consuming the stream.  So, we don't emit('end') until some reading
55   // happens.
56   this.calledRead = false;
57
58   // a flag to be able to tell if the onwrite cb is called immediately,
59   // or on a later tick.  We set this to true at first, becuase any
60   // actions that shouldn't happen until "later" should generally also
61   // not happen before the first write call.
62   this.sync = true;
63
64   // whenever we return null, then we set a flag to say
65   // that we're awaiting a 'readable' event emission.
66   this.needReadable = false;
67   this.emittedReadable = false;
68   this.readableListening = false;
69
70
71   // object stream flag. Used to make read(n) ignore n and to
72   // make all the buffer merging and length checks go away
73   this.objectMode = !!options.objectMode;
74
75   // Crypto is kind of old and crusty.  Historically, its default string
76   // encoding is 'binary' so we have to make this configurable.
77   // Everything else in the universe uses 'utf8', though.
78   this.defaultEncoding = options.defaultEncoding || 'utf8';
79
80   // when piping, we only care about 'readable' events that happen
81   // after read()ing all the bytes and not getting any pushback.
82   this.ranOut = false;
83
84   // the number of writers that are awaiting a drain event in .pipe()s
85   this.awaitDrain = 0;
86
87   // if true, a maybeReadMore has been scheduled
88   this.readingMore = false;
89
90   this.decoder = null;
91   this.encoding = null;
92   if (options.encoding) {
93     if (!StringDecoder)
94       StringDecoder = require('string_decoder').StringDecoder;
95     this.decoder = new StringDecoder(options.encoding);
96     this.encoding = options.encoding;
97   }
98 }
99
100 function Readable(options) {
101   if (!(this instanceof Readable))
102     return new Readable(options);
103
104   this._readableState = new ReadableState(options, this);
105
106   // legacy
107   this.readable = true;
108
109   Stream.call(this);
110 }
111
112 // Manually shove something into the read() buffer.
113 // This returns true if the highWaterMark has not been hit yet,
114 // similar to how Writable.write() returns true if you should
115 // write() some more.
116 Readable.prototype.push = function(chunk, encoding) {
117   var state = this._readableState;
118
119   if (typeof chunk === 'string' && !state.objectMode) {
120     encoding = encoding || state.defaultEncoding;
121     if (encoding !== state.encoding) {
122       chunk = new Buffer(chunk, encoding);
123       encoding = '';
124     }
125   }
126
127   return readableAddChunk(this, state, chunk, encoding, false);
128 };
129
130 // Unshift should *always* be something directly out of read()
131 Readable.prototype.unshift = function(chunk) {
132   var state = this._readableState;
133   return readableAddChunk(this, state, chunk, '', true);
134 };
135
136 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
137   var er = chunkInvalid(state, chunk);
138   if (er) {
139     stream.emit('error', er);
140   } else if (chunk === null || chunk === undefined) {
141     state.reading = false;
142     if (!state.ended)
143       onEofChunk(stream, state);
144   } else if (state.objectMode || chunk && chunk.length > 0) {
145     if (state.ended && !addToFront) {
146       var e = new Error('stream.push() after EOF');
147       stream.emit('error', e);
148     } else if (state.endEmitted && addToFront) {
149       var e = new Error('stream.unshift() after end event');
150       stream.emit('error', e);
151     } else {
152       if (state.decoder && !addToFront && !encoding)
153         chunk = state.decoder.write(chunk);
154
155       // update the buffer info.
156       state.length += state.objectMode ? 1 : chunk.length;
157       if (addToFront) {
158         state.buffer.unshift(chunk);
159       } else {
160         state.reading = false;
161         state.buffer.push(chunk);
162       }
163
164       if (state.needReadable)
165         emitReadable(stream);
166
167       maybeReadMore(stream, state);
168     }
169   } else if (!addToFront) {
170     state.reading = false;
171   }
172
173   return needMoreData(state);
174 }
175
176
177
178 // if it's past the high water mark, we can push in some more.
179 // Also, if we have no data yet, we can stand some
180 // more bytes.  This is to work around cases where hwm=0,
181 // such as the repl.  Also, if the push() triggered a
182 // readable event, and the user called read(largeNumber) such that
183 // needReadable was set, then we ought to push more, so that another
184 // 'readable' event will be triggered.
185 function needMoreData(state) {
186   return !state.ended &&
187          (state.needReadable ||
188           state.length < state.highWaterMark ||
189           state.length === 0);
190 }
191
192 // backwards compatibility.
193 Readable.prototype.setEncoding = function(enc) {
194   if (!StringDecoder)
195     StringDecoder = require('string_decoder').StringDecoder;
196   this._readableState.decoder = new StringDecoder(enc);
197   this._readableState.encoding = enc;
198 };
199
200 // Don't raise the hwm > 128MB
201 var MAX_HWM = 0x800000;
202 function roundUpToNextPowerOf2(n) {
203   if (n >= MAX_HWM) {
204     n = MAX_HWM;
205   } else {
206     // Get the next highest power of 2
207     n--;
208     for (var p = 1; p < 32; p <<= 1) n |= n >> p;
209     n++;
210   }
211   return n;
212 }
213
214 function howMuchToRead(n, state) {
215   if (state.length === 0 && state.ended)
216     return 0;
217
218   if (state.objectMode)
219     return n === 0 ? 0 : 1;
220
221   if (isNaN(n) || n === null) {
222     // only flow one buffer at a time
223     if (state.flowing && state.buffer.length)
224       return state.buffer[0].length;
225     else
226       return state.length;
227   }
228
229   if (n <= 0)
230     return 0;
231
232   // If we're asking for more than the target buffer level,
233   // then raise the water mark.  Bump up to the next highest
234   // power of 2, to prevent increasing it excessively in tiny
235   // amounts.
236   if (n > state.highWaterMark)
237     state.highWaterMark = roundUpToNextPowerOf2(n);
238
239   // don't have that much.  return null, unless we've ended.
240   if (n > state.length) {
241     if (!state.ended) {
242       state.needReadable = true;
243       return 0;
244     } else
245       return state.length;
246   }
247
248   return n;
249 }
250
251 // you can override either this method, or the async _read(n) below.
252 Readable.prototype.read = function(n) {
253   var state = this._readableState;
254   state.calledRead = true;
255   var nOrig = n;
256
257   if (typeof n !== 'number' || n > 0)
258     state.emittedReadable = false;
259
260   // if we're doing read(0) to trigger a readable event, but we
261   // already have a bunch of data in the buffer, then just trigger
262   // the 'readable' event and move on.
263   if (n === 0 &&
264       state.needReadable &&
265       (state.length >= state.highWaterMark || state.ended)) {
266     emitReadable(this);
267     return null;
268   }
269
270   n = howMuchToRead(n, state);
271
272   // if we've ended, and we're now clear, then finish it up.
273   if (n === 0 && state.ended) {
274     if (state.length === 0)
275       endReadable(this);
276     return null;
277   }
278
279   // All the actual chunk generation logic needs to be
280   // *below* the call to _read.  The reason is that in certain
281   // synthetic stream cases, such as passthrough streams, _read
282   // may be a completely synchronous operation which may change
283   // the state of the read buffer, providing enough data when
284   // before there was *not* enough.
285   //
286   // So, the steps are:
287   // 1. Figure out what the state of things will be after we do
288   // a read from the buffer.
289   //
290   // 2. If that resulting state will trigger a _read, then call _read.
291   // Note that this may be asynchronous, or synchronous.  Yes, it is
292   // deeply ugly to write APIs this way, but that still doesn't mean
293   // that the Readable class should behave improperly, as streams are
294   // designed to be sync/async agnostic.
295   // Take note if the _read call is sync or async (ie, if the read call
296   // has returned yet), so that we know whether or not it's safe to emit
297   // 'readable' etc.
298   //
299   // 3. Actually pull the requested chunks out of the buffer and return.
300
301   // if we need a readable event, then we need to do some reading.
302   var doRead = state.needReadable;
303
304   // if we currently have less than the highWaterMark, then also read some
305   if (state.length - n <= state.highWaterMark)
306     doRead = true;
307
308   // however, if we've ended, then there's no point, and if we're already
309   // reading, then it's unnecessary.
310   if (state.ended || state.reading)
311     doRead = false;
312
313   if (doRead) {
314     state.reading = true;
315     state.sync = true;
316     // if the length is currently zero, then we *need* a readable event.
317     if (state.length === 0)
318       state.needReadable = true;
319     // call internal read method
320     this._read(state.highWaterMark);
321     state.sync = false;
322   }
323
324   // If _read called its callback synchronously, then `reading`
325   // will be false, and we need to re-evaluate how much data we
326   // can return to the user.
327   if (doRead && !state.reading)
328     n = howMuchToRead(nOrig, state);
329
330   var ret;
331   if (n > 0)
332     ret = fromList(n, state);
333   else
334     ret = null;
335
336   if (ret === null) {
337     state.needReadable = true;
338     n = 0;
339   }
340
341   state.length -= n;
342
343   // If we have nothing in the buffer, then we want to know
344   // as soon as we *do* get something into the buffer.
345   if (state.length === 0 && !state.ended)
346     state.needReadable = true;
347
348   // If we happened to read() exactly the remaining amount in the
349   // buffer, and the EOF has been seen at this point, then make sure
350   // that we emit 'end' on the very next tick.
351   if (state.ended && !state.endEmitted && state.length === 0)
352     endReadable(this);
353
354   return ret;
355 };
356
357 function chunkInvalid(state, chunk) {
358   var er = null;
359   if (!Buffer.isBuffer(chunk) &&
360       'string' !== typeof chunk &&
361       chunk !== null &&
362       chunk !== undefined &&
363       !state.objectMode &&
364       !er) {
365     er = new TypeError('Invalid non-string/buffer chunk');
366   }
367   return er;
368 }
369
370
371 function onEofChunk(stream, state) {
372   if (state.decoder && !state.ended) {
373     var chunk = state.decoder.end();
374     if (chunk && chunk.length) {
375       state.buffer.push(chunk);
376       state.length += state.objectMode ? 1 : chunk.length;
377     }
378   }
379   state.ended = true;
380
381   // if we've ended and we have some data left, then emit
382   // 'readable' now to make sure it gets picked up.
383   if (state.length > 0)
384     emitReadable(stream);
385   else
386     endReadable(stream);
387 }
388
389 // Don't emit readable right away in sync mode, because this can trigger
390 // another read() call => stack overflow.  This way, it might trigger
391 // a nextTick recursion warning, but that's not so bad.
392 function emitReadable(stream) {
393   var state = stream._readableState;
394   state.needReadable = false;
395   if (state.emittedReadable)
396     return;
397
398   state.emittedReadable = true;
399   if (state.sync)
400     process.nextTick(function() {
401       emitReadable_(stream);
402     });
403   else
404     emitReadable_(stream);
405 }
406
407 function emitReadable_(stream) {
408   stream.emit('readable');
409 }
410
411
412 // at this point, the user has presumably seen the 'readable' event,
413 // and called read() to consume some data.  that may have triggered
414 // in turn another _read(n) call, in which case reading = true if
415 // it's in progress.
416 // However, if we're not ended, or reading, and the length < hwm,
417 // then go ahead and try to read some more preemptively.
418 function maybeReadMore(stream, state) {
419   if (!state.readingMore) {
420     state.readingMore = true;
421     process.nextTick(function() {
422       maybeReadMore_(stream, state);
423     });
424   }
425 }
426
427 function maybeReadMore_(stream, state) {
428   var len = state.length;
429   while (!state.reading && !state.flowing && !state.ended &&
430          state.length < state.highWaterMark) {
431     stream.read(0);
432     if (len === state.length)
433       // didn't get any data, stop spinning.
434       break;
435     else
436       len = state.length;
437   }
438   state.readingMore = false;
439 }
440
441 // abstract method.  to be overridden in specific implementation classes.
442 // call cb(er, data) where data is <= n in length.
443 // for virtual (non-string, non-buffer) streams, "length" is somewhat
444 // arbitrary, and perhaps not very meaningful.
445 Readable.prototype._read = function(n) {
446   this.emit('error', new Error('not implemented'));
447 };
448
449 Readable.prototype.pipe = function(dest, pipeOpts) {
450   var src = this;
451   var state = this._readableState;
452
453   switch (state.pipesCount) {
454     case 0:
455       state.pipes = dest;
456       break;
457     case 1:
458       state.pipes = [state.pipes, dest];
459       break;
460     default:
461       state.pipes.push(dest);
462       break;
463   }
464   state.pipesCount += 1;
465
466   var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
467               dest !== process.stdout &&
468               dest !== process.stderr;
469
470   var endFn = doEnd ? onend : cleanup;
471   if (state.endEmitted)
472     process.nextTick(endFn);
473   else
474     src.once('end', endFn);
475
476   dest.on('unpipe', onunpipe);
477   function onunpipe(readable) {
478     if (readable !== src) return;
479     cleanup();
480   }
481
482   function onend() {
483     dest.end();
484   }
485
486   // when the dest drains, it reduces the awaitDrain counter
487   // on the source.  This would be more elegant with a .once()
488   // handler in flow(), but adding and removing repeatedly is
489   // too slow.
490   var ondrain = pipeOnDrain(src);
491   dest.on('drain', ondrain);
492
493   function cleanup() {
494     // cleanup event handlers once the pipe is broken
495     dest.removeListener('close', onclose);
496     dest.removeListener('finish', onfinish);
497     dest.removeListener('drain', ondrain);
498     dest.removeListener('error', onerror);
499     dest.removeListener('unpipe', onunpipe);
500     src.removeListener('end', onend);
501     src.removeListener('end', cleanup);
502
503     // if the reader is waiting for a drain event from this
504     // specific writer, then it would cause it to never start
505     // flowing again.
506     // So, if this is awaiting a drain, then we just call it now.
507     // If we don't know, then assume that we are waiting for one.
508     if (!dest._writableState || dest._writableState.needDrain)
509       ondrain();
510   }
511
512   // if the dest has an error, then stop piping into it.
513   // however, don't suppress the throwing behavior for this.
514   function onerror(er) {
515     unpipe();
516     dest.removeListener('error', onerror);
517     if (EE.listenerCount(dest, 'error') === 0)
518       dest.emit('error', er);
519   }
520   // This is a brutally ugly hack to make sure that our error handler
521   // is attached before any userland ones.  NEVER DO THIS.
522   if (!dest._events || !dest._events.error)
523     dest.on('error', onerror);
524   else if (Array.isArray(dest._events.error))
525     dest._events.error.unshift(onerror);
526   else
527     dest._events.error = [onerror, dest._events.error];
528
529
530
531   // Both close and finish should trigger unpipe, but only once.
532   function onclose() {
533     dest.removeListener('finish', onfinish);
534     unpipe();
535   }
536   dest.once('close', onclose);
537   function onfinish() {
538     dest.removeListener('close', onclose);
539     unpipe();
540   }
541   dest.once('finish', onfinish);
542
543   function unpipe() {
544     src.unpipe(dest);
545   }
546
547   // tell the dest that it's being piped to
548   dest.emit('pipe', src);
549
550   // start the flow if it hasn't been started already.
551   if (!state.flowing) {
552     // the handler that waits for readable events after all
553     // the data gets sucked out in flow.
554     // This would be easier to follow with a .once() handler
555     // in flow(), but that is too slow.
556     this.on('readable', pipeOnReadable);
557
558     state.flowing = true;
559     process.nextTick(function() {
560       flow(src);
561     });
562   }
563
564   return dest;
565 };
566
567 function pipeOnDrain(src) {
568   return function() {
569     var dest = this;
570     var state = src._readableState;
571     state.awaitDrain--;
572     if (state.awaitDrain === 0)
573       flow(src);
574   };
575 }
576
577 function flow(src) {
578   var state = src._readableState;
579   var chunk;
580   state.awaitDrain = 0;
581
582   function write(dest, i, list) {
583     var written = dest.write(chunk);
584     if (false === written) {
585       state.awaitDrain++;
586     }
587   }
588
589   while (state.pipesCount && null !== (chunk = src.read())) {
590
591     if (state.pipesCount === 1)
592       write(state.pipes, 0, null);
593     else
594       state.pipes.forEach(write);
595
596     src.emit('data', chunk);
597
598     // if anyone needs a drain, then we have to wait for that.
599     if (state.awaitDrain > 0)
600       return;
601   }
602
603   // if every destination was unpiped, either before entering this
604   // function, or in the while loop, then stop flowing.
605   //
606   // NB: This is a pretty rare edge case.
607   if (state.pipesCount === 0) {
608     state.flowing = false;
609
610     // if there were data event listeners added, then switch to old mode.
611     if (EE.listenerCount(src, 'data') > 0)
612       emitDataEvents(src);
613     return;
614   }
615
616   // at this point, no one needed a drain, so we just ran out of data
617   // on the next readable event, start it over again.
618   state.ranOut = true;
619 }
620
621 function pipeOnReadable() {
622   if (this._readableState.ranOut) {
623     this._readableState.ranOut = false;
624     flow(this);
625   }
626 }
627
628
629 Readable.prototype.unpipe = function(dest) {
630   var state = this._readableState;
631
632   // if we're not piping anywhere, then do nothing.
633   if (state.pipesCount === 0)
634     return this;
635
636   // just one destination.  most common case.
637   if (state.pipesCount === 1) {
638     // passed in one, but it's not the right one.
639     if (dest && dest !== state.pipes)
640       return this;
641
642     if (!dest)
643       dest = state.pipes;
644
645     // got a match.
646     state.pipes = null;
647     state.pipesCount = 0;
648     this.removeListener('readable', pipeOnReadable);
649     state.flowing = false;
650     if (dest)
651       dest.emit('unpipe', this);
652     return this;
653   }
654
655   // slow case. multiple pipe destinations.
656
657   if (!dest) {
658     // remove all.
659     var dests = state.pipes;
660     var len = state.pipesCount;
661     state.pipes = null;
662     state.pipesCount = 0;
663     this.removeListener('readable', pipeOnReadable);
664     state.flowing = false;
665
666     for (var i = 0; i < len; i++)
667       dests[i].emit('unpipe', this);
668     return this;
669   }
670
671   // try to find the right one.
672   var i = state.pipes.indexOf(dest);
673   if (i === -1)
674     return this;
675
676   state.pipes.splice(i, 1);
677   state.pipesCount -= 1;
678   if (state.pipesCount === 1)
679     state.pipes = state.pipes[0];
680
681   dest.emit('unpipe', this);
682
683   return this;
684 };
685
686 // set up data events if they are asked for
687 // Ensure readable listeners eventually get something
688 Readable.prototype.on = function(ev, fn) {
689   var res = Stream.prototype.on.call(this, ev, fn);
690
691   if (ev === 'data' && !this._readableState.flowing)
692     emitDataEvents(this);
693
694   if (ev === 'readable' && this.readable) {
695     var state = this._readableState;
696     if (!state.readableListening) {
697       state.readableListening = true;
698       state.emittedReadable = false;
699       state.needReadable = true;
700       if (!state.reading) {
701         this.read(0);
702       } else if (state.length) {
703         emitReadable(this, state);
704       }
705     }
706   }
707
708   return res;
709 };
710 Readable.prototype.addListener = Readable.prototype.on;
711
712 // pause() and resume() are remnants of the legacy readable stream API
713 // If the user uses them, then switch into old mode.
714 Readable.prototype.resume = function() {
715   emitDataEvents(this);
716   this.read(0);
717   this.emit('resume');
718 };
719
720 Readable.prototype.pause = function() {
721   emitDataEvents(this, true);
722   this.emit('pause');
723 };
724
725 function emitDataEvents(stream, startPaused) {
726   var state = stream._readableState;
727
728   if (state.flowing) {
729     // https://github.com/isaacs/readable-stream/issues/16
730     throw new Error('Cannot switch to old mode now.');
731   }
732
733   var paused = startPaused || false;
734   var readable = false;
735
736   // convert to an old-style stream.
737   stream.readable = true;
738   stream.pipe = Stream.prototype.pipe;
739   stream.on = stream.addListener = Stream.prototype.on;
740
741   stream.on('readable', function() {
742     readable = true;
743
744     var c;
745     while (!paused && (null !== (c = stream.read())))
746       stream.emit('data', c);
747
748     if (c === null) {
749       readable = false;
750       stream._readableState.needReadable = true;
751     }
752   });
753
754   stream.pause = function() {
755     paused = true;
756     this.emit('pause');
757   };
758
759   stream.resume = function() {
760     paused = false;
761     if (readable)
762       process.nextTick(function() {
763         stream.emit('readable');
764       });
765     else
766       this.read(0);
767     this.emit('resume');
768   };
769
770   // now make it start, just in case it hadn't already.
771   stream.emit('readable');
772 }
773
774 // wrap an old-style stream as the async data source.
775 // This is *not* part of the readable stream interface.
776 // It is an ugly unfortunate mess of history.
777 Readable.prototype.wrap = function(stream) {
778   var state = this._readableState;
779   var paused = false;
780
781   var self = this;
782   stream.on('end', function() {
783     if (state.decoder && !state.ended) {
784       var chunk = state.decoder.end();
785       if (chunk && chunk.length)
786         self.push(chunk);
787     }
788
789     self.push(null);
790   });
791
792   stream.on('data', function(chunk) {
793     if (state.decoder)
794       chunk = state.decoder.write(chunk);
795     if (!chunk || !state.objectMode && !chunk.length)
796       return;
797
798     var ret = self.push(chunk);
799     if (!ret) {
800       paused = true;
801       stream.pause();
802     }
803   });
804
805   // proxy all the other methods.
806   // important when wrapping filters and duplexes.
807   for (var i in stream) {
808     if (typeof stream[i] === 'function' &&
809         typeof this[i] === 'undefined') {
810       this[i] = function(method) { return function() {
811         return stream[method].apply(stream, arguments);
812       }}(i);
813     }
814   }
815
816   // proxy certain important events.
817   var events = ['error', 'close', 'destroy', 'pause', 'resume'];
818   events.forEach(function(ev) {
819     stream.on(ev, self.emit.bind(self, ev));
820   });
821
822   // when we try to consume some more bytes, simply unpause the
823   // underlying stream.
824   self._read = function(n) {
825     if (paused) {
826       paused = false;
827       stream.resume();
828     }
829   };
830
831   return self;
832 };
833
834
835
836 // exposed for testing purposes only.
837 Readable._fromList = fromList;
838
839 // Pluck off n bytes from an array of buffers.
840 // Length is the combined lengths of all the buffers in the list.
841 function fromList(n, state) {
842   var list = state.buffer;
843   var length = state.length;
844   var stringMode = !!state.decoder;
845   var objectMode = !!state.objectMode;
846   var ret;
847
848   // nothing in the list, definitely empty.
849   if (list.length === 0)
850     return null;
851
852   if (length === 0)
853     ret = null;
854   else if (objectMode)
855     ret = list.shift();
856   else if (!n || n >= length) {
857     // read it all, truncate the array.
858     if (stringMode)
859       ret = list.join('');
860     else
861       ret = Buffer.concat(list, length);
862     list.length = 0;
863   } else {
864     // read just some of it.
865     if (n < list[0].length) {
866       // just take a part of the first list item.
867       // slice is the same for buffers and strings.
868       var buf = list[0];
869       ret = buf.slice(0, n);
870       list[0] = buf.slice(n);
871     } else if (n === list[0].length) {
872       // first list is a perfect match
873       ret = list.shift();
874     } else {
875       // complex case.
876       // we have enough to cover it, but it spans past the first buffer.
877       if (stringMode)
878         ret = '';
879       else
880         ret = new Buffer(n);
881
882       var c = 0;
883       for (var i = 0, l = list.length; i < l && c < n; i++) {
884         var buf = list[0];
885         var cpy = Math.min(n - c, buf.length);
886
887         if (stringMode)
888           ret += buf.slice(0, cpy);
889         else
890           buf.copy(ret, c, 0, cpy);
891
892         if (cpy < buf.length)
893           list[0] = buf.slice(cpy);
894         else
895           list.shift();
896
897         c += cpy;
898       }
899     }
900   }
901
902   return ret;
903 }
904
905 function endReadable(stream) {
906   var state = stream._readableState;
907
908   // If we get here before consuming all the bytes, then that is a
909   // bug in node.  Should never happen.
910   if (state.length > 0)
911     throw new Error('endReadable called on non-empty stream');
912
913   if (!state.endEmitted && state.calledRead) {
914     state.ended = true;
915     process.nextTick(function() {
916       // Check that we didn't get one last unshift.
917       if (!state.endEmitted && state.length === 0) {
918         state.endEmitted = true;
919         stream.readable = false;
920         stream.emit('end');
921       }
922     });
923   }
924 }