revise installing a license file
[platform/upstream/nodejs.git] / lib / _stream_readable.js
1 'use strict';
2
3 module.exports = Readable;
4 Readable.ReadableState = ReadableState;
5
6 const EE = require('events');
7 const Stream = require('stream');
8 const Buffer = require('buffer').Buffer;
9 const util = require('util');
10 const debug = util.debuglog('stream');
11 var StringDecoder;
12
13 util.inherits(Readable, Stream);
14
15 function ReadableState(options, stream) {
16   options = options || {};
17
18   // object stream flag. Used to make read(n) ignore n and to
19   // make all the buffer merging and length checks go away
20   this.objectMode = !!options.objectMode;
21
22   if (stream instanceof Stream.Duplex)
23     this.objectMode = this.objectMode || !!options.readableObjectMode;
24
25   // the point at which it stops calling _read() to fill the buffer
26   // Note: 0 is a valid value, means "don't call _read preemptively ever"
27   var hwm = options.highWaterMark;
28   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
29   this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
30
31   // cast to ints.
32   this.highWaterMark = ~~this.highWaterMark;
33
34   this.buffer = [];
35   this.length = 0;
36   this.pipes = null;
37   this.pipesCount = 0;
38   this.flowing = null;
39   this.ended = false;
40   this.endEmitted = false;
41   this.reading = false;
42
43   // a flag to be able to tell if the onwrite cb is called immediately,
44   // or on a later tick.  We set this to true at first, because any
45   // actions that shouldn't happen until "later" should generally also
46   // not happen before the first write call.
47   this.sync = true;
48
49   // whenever we return null, then we set a flag to say
50   // that we're awaiting a 'readable' event emission.
51   this.needReadable = false;
52   this.emittedReadable = false;
53   this.readableListening = false;
54   this.resumeScheduled = false;
55
56   // Crypto is kind of old and crusty.  Historically, its default string
57   // encoding is 'binary' so we have to make this configurable.
58   // Everything else in the universe uses 'utf8', though.
59   this.defaultEncoding = options.defaultEncoding || 'utf8';
60
61   // when piping, we only care about 'readable' events that happen
62   // after read()ing all the bytes and not getting any pushback.
63   this.ranOut = false;
64
65   // the number of writers that are awaiting a drain event in .pipe()s
66   this.awaitDrain = 0;
67
68   // if true, a maybeReadMore has been scheduled
69   this.readingMore = false;
70
71   this.decoder = null;
72   this.encoding = null;
73   if (options.encoding) {
74     if (!StringDecoder)
75       StringDecoder = require('string_decoder').StringDecoder;
76     this.decoder = new StringDecoder(options.encoding);
77     this.encoding = options.encoding;
78   }
79 }
80
81 function Readable(options) {
82   if (!(this instanceof Readable))
83     return new Readable(options);
84
85   this._readableState = new ReadableState(options, this);
86
87   // legacy
88   this.readable = true;
89
90   if (options && typeof options.read === 'function')
91     this._read = options.read;
92
93   Stream.call(this);
94 }
95
96 // Manually shove something into the read() buffer.
97 // This returns true if the highWaterMark has not been hit yet,
98 // similar to how Writable.write() returns true if you should
99 // write() some more.
100 Readable.prototype.push = function(chunk, encoding) {
101   var state = this._readableState;
102
103   if (!state.objectMode && typeof chunk === 'string') {
104     encoding = encoding || state.defaultEncoding;
105     if (encoding !== state.encoding) {
106       chunk = new Buffer(chunk, encoding);
107       encoding = '';
108     }
109   }
110
111   return readableAddChunk(this, state, chunk, encoding, false);
112 };
113
114 // Unshift should *always* be something directly out of read()
115 Readable.prototype.unshift = function(chunk) {
116   var state = this._readableState;
117   return readableAddChunk(this, state, chunk, '', true);
118 };
119
120 Readable.prototype.isPaused = function() {
121   return this._readableState.flowing === false;
122 };
123
124 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
125   var er = chunkInvalid(state, chunk);
126   if (er) {
127     stream.emit('error', er);
128   } else if (chunk === null) {
129     state.reading = false;
130     onEofChunk(stream, state);
131   } else if (state.objectMode || chunk && chunk.length > 0) {
132     if (state.ended && !addToFront) {
133       const e = new Error('stream.push() after EOF');
134       stream.emit('error', e);
135     } else if (state.endEmitted && addToFront) {
136       const e = new Error('stream.unshift() after end event');
137       stream.emit('error', e);
138     } else {
139       var skipAdd;
140       if (state.decoder && !addToFront && !encoding) {
141         chunk = state.decoder.write(chunk);
142         skipAdd = (!state.objectMode && chunk.length === 0);
143       }
144
145       if (!addToFront)
146         state.reading = false;
147
148       // Don't add to the buffer if we've decoded to an empty string chunk and
149       // we're not in object mode
150       if (!skipAdd) {
151         // if we want the data now, just emit it.
152         if (state.flowing && state.length === 0 && !state.sync) {
153           stream.emit('data', chunk);
154           stream.read(0);
155         } else {
156           // update the buffer info.
157           state.length += state.objectMode ? 1 : chunk.length;
158           if (addToFront)
159             state.buffer.unshift(chunk);
160           else
161             state.buffer.push(chunk);
162
163           if (state.needReadable)
164             emitReadable(stream);
165         }
166       }
167
168       maybeReadMore(stream, state);
169     }
170   } else if (!addToFront) {
171     state.reading = false;
172   }
173
174   return needMoreData(state);
175 }
176
177
178 // if it's past the high water mark, we can push in some more.
179 // Also, if we have no data yet, we can stand some
180 // more bytes.  This is to work around cases where hwm=0,
181 // such as the repl.  Also, if the push() triggered a
182 // readable event, and the user called read(largeNumber) such that
183 // needReadable was set, then we ought to push more, so that another
184 // 'readable' event will be triggered.
185 function needMoreData(state) {
186   return !state.ended &&
187          (state.needReadable ||
188           state.length < state.highWaterMark ||
189           state.length === 0);
190 }
191
192 // backwards compatibility.
193 Readable.prototype.setEncoding = function(enc) {
194   if (!StringDecoder)
195     StringDecoder = require('string_decoder').StringDecoder;
196   this._readableState.decoder = new StringDecoder(enc);
197   this._readableState.encoding = enc;
198   return this;
199 };
200
201 // Don't raise the hwm > 8MB
202 const MAX_HWM = 0x800000;
203 function computeNewHighWaterMark(n) {
204   if (n >= MAX_HWM) {
205     n = MAX_HWM;
206   } else {
207     // Get the next highest power of 2
208     n--;
209     n |= n >>> 1;
210     n |= n >>> 2;
211     n |= n >>> 4;
212     n |= n >>> 8;
213     n |= n >>> 16;
214     n++;
215   }
216   return n;
217 }
218
219 function howMuchToRead(n, state) {
220   if (state.length === 0 && state.ended)
221     return 0;
222
223   if (state.objectMode)
224     return n === 0 ? 0 : 1;
225
226   if (n === null || isNaN(n)) {
227     // only flow one buffer at a time
228     if (state.flowing && state.buffer.length)
229       return state.buffer[0].length;
230     else
231       return state.length;
232   }
233
234   if (n <= 0)
235     return 0;
236
237   // If we're asking for more than the target buffer level,
238   // then raise the water mark.  Bump up to the next highest
239   // power of 2, to prevent increasing it excessively in tiny
240   // amounts.
241   if (n > state.highWaterMark)
242     state.highWaterMark = computeNewHighWaterMark(n);
243
244   // don't have that much.  return null, unless we've ended.
245   if (n > state.length) {
246     if (!state.ended) {
247       state.needReadable = true;
248       return 0;
249     } else {
250       return state.length;
251     }
252   }
253
254   return n;
255 }
256
257 // you can override either this method, or the async _read(n) below.
258 Readable.prototype.read = function(n) {
259   debug('read', n);
260   var state = this._readableState;
261   var nOrig = n;
262
263   if (typeof n !== 'number' || n > 0)
264     state.emittedReadable = false;
265
266   // if we're doing read(0) to trigger a readable event, but we
267   // already have a bunch of data in the buffer, then just trigger
268   // the 'readable' event and move on.
269   if (n === 0 &&
270       state.needReadable &&
271       (state.length >= state.highWaterMark || state.ended)) {
272     debug('read: emitReadable', state.length, state.ended);
273     if (state.length === 0 && state.ended)
274       endReadable(this);
275     else
276       emitReadable(this);
277     return null;
278   }
279
280   n = howMuchToRead(n, state);
281
282   // if we've ended, and we're now clear, then finish it up.
283   if (n === 0 && state.ended) {
284     if (state.length === 0)
285       endReadable(this);
286     return null;
287   }
288
289   // All the actual chunk generation logic needs to be
290   // *below* the call to _read.  The reason is that in certain
291   // synthetic stream cases, such as passthrough streams, _read
292   // may be a completely synchronous operation which may change
293   // the state of the read buffer, providing enough data when
294   // before there was *not* enough.
295   //
296   // So, the steps are:
297   // 1. Figure out what the state of things will be after we do
298   // a read from the buffer.
299   //
300   // 2. If that resulting state will trigger a _read, then call _read.
301   // Note that this may be asynchronous, or synchronous.  Yes, it is
302   // deeply ugly to write APIs this way, but that still doesn't mean
303   // that the Readable class should behave improperly, as streams are
304   // designed to be sync/async agnostic.
305   // Take note if the _read call is sync or async (ie, if the read call
306   // has returned yet), so that we know whether or not it's safe to emit
307   // 'readable' etc.
308   //
309   // 3. Actually pull the requested chunks out of the buffer and return.
310
311   // if we need a readable event, then we need to do some reading.
312   var doRead = state.needReadable;
313   debug('need readable', doRead);
314
315   // if we currently have less than the highWaterMark, then also read some
316   if (state.length === 0 || state.length - n < state.highWaterMark) {
317     doRead = true;
318     debug('length less than watermark', doRead);
319   }
320
321   // however, if we've ended, then there's no point, and if we're already
322   // reading, then it's unnecessary.
323   if (state.ended || state.reading) {
324     doRead = false;
325     debug('reading or ended', doRead);
326   }
327
328   if (doRead) {
329     debug('do read');
330     state.reading = true;
331     state.sync = true;
332     // if the length is currently zero, then we *need* a readable event.
333     if (state.length === 0)
334       state.needReadable = true;
335     // call internal read method
336     this._read(state.highWaterMark);
337     state.sync = false;
338   }
339
340   // If _read pushed data synchronously, then `reading` will be false,
341   // and we need to re-evaluate how much data we can return to the user.
342   if (doRead && !state.reading)
343     n = howMuchToRead(nOrig, state);
344
345   var ret;
346   if (n > 0)
347     ret = fromList(n, state);
348   else
349     ret = null;
350
351   if (ret === null) {
352     state.needReadable = true;
353     n = 0;
354   }
355
356   state.length -= n;
357
358   // If we have nothing in the buffer, then we want to know
359   // as soon as we *do* get something into the buffer.
360   if (state.length === 0 && !state.ended)
361     state.needReadable = true;
362
363   // If we tried to read() past the EOF, then emit end on the next tick.
364   if (nOrig !== n && state.ended && state.length === 0)
365     endReadable(this);
366
367   if (ret !== null)
368     this.emit('data', ret);
369
370   return ret;
371 };
372
373 function chunkInvalid(state, chunk) {
374   var er = null;
375   if (!(chunk instanceof Buffer) &&
376       typeof chunk !== 'string' &&
377       chunk !== null &&
378       chunk !== undefined &&
379       !state.objectMode) {
380     er = new TypeError('Invalid non-string/buffer chunk');
381   }
382   return er;
383 }
384
385
386 function onEofChunk(stream, state) {
387   if (state.ended) return;
388   if (state.decoder) {
389     var chunk = state.decoder.end();
390     if (chunk && chunk.length) {
391       state.buffer.push(chunk);
392       state.length += state.objectMode ? 1 : chunk.length;
393     }
394   }
395   state.ended = true;
396
397   // emit 'readable' now to make sure it gets picked up.
398   emitReadable(stream);
399 }
400
401 // Don't emit readable right away in sync mode, because this can trigger
402 // another read() call => stack overflow.  This way, it might trigger
403 // a nextTick recursion warning, but that's not so bad.
404 function emitReadable(stream) {
405   var state = stream._readableState;
406   state.needReadable = false;
407   if (!state.emittedReadable) {
408     debug('emitReadable', state.flowing);
409     state.emittedReadable = true;
410     if (state.sync)
411       process.nextTick(emitReadable_, stream);
412     else
413       emitReadable_(stream);
414   }
415 }
416
417 function emitReadable_(stream) {
418   debug('emit readable');
419   stream.emit('readable');
420   flow(stream);
421 }
422
423
424 // at this point, the user has presumably seen the 'readable' event,
425 // and called read() to consume some data.  that may have triggered
426 // in turn another _read(n) call, in which case reading = true if
427 // it's in progress.
428 // However, if we're not ended, or reading, and the length < hwm,
429 // then go ahead and try to read some more preemptively.
430 function maybeReadMore(stream, state) {
431   if (!state.readingMore) {
432     state.readingMore = true;
433     process.nextTick(maybeReadMore_, stream, state);
434   }
435 }
436
437 function maybeReadMore_(stream, state) {
438   var len = state.length;
439   while (!state.reading && !state.flowing && !state.ended &&
440          state.length < state.highWaterMark) {
441     debug('maybeReadMore read 0');
442     stream.read(0);
443     if (len === state.length)
444       // didn't get any data, stop spinning.
445       break;
446     else
447       len = state.length;
448   }
449   state.readingMore = false;
450 }
451
452 // abstract method.  to be overridden in specific implementation classes.
453 // call cb(er, data) where data is <= n in length.
454 // for virtual (non-string, non-buffer) streams, "length" is somewhat
455 // arbitrary, and perhaps not very meaningful.
456 Readable.prototype._read = function(n) {
457   this.emit('error', new Error('not implemented'));
458 };
459
460 Readable.prototype.pipe = function(dest, pipeOpts) {
461   var src = this;
462   var state = this._readableState;
463
464   switch (state.pipesCount) {
465     case 0:
466       state.pipes = dest;
467       break;
468     case 1:
469       state.pipes = [state.pipes, dest];
470       break;
471     default:
472       state.pipes.push(dest);
473       break;
474   }
475   state.pipesCount += 1;
476   debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
477
478   var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
479               dest !== process.stdout &&
480               dest !== process.stderr;
481
482   var endFn = doEnd ? onend : cleanup;
483   if (state.endEmitted)
484     process.nextTick(endFn);
485   else
486     src.once('end', endFn);
487
488   dest.on('unpipe', onunpipe);
489   function onunpipe(readable) {
490     debug('onunpipe');
491     if (readable === src) {
492       cleanup();
493     }
494   }
495
496   function onend() {
497     debug('onend');
498     dest.end();
499   }
500
501   // when the dest drains, it reduces the awaitDrain counter
502   // on the source.  This would be more elegant with a .once()
503   // handler in flow(), but adding and removing repeatedly is
504   // too slow.
505   var ondrain = pipeOnDrain(src);
506   dest.on('drain', ondrain);
507
508   var cleanedUp = false;
509   function cleanup() {
510     debug('cleanup');
511     // cleanup event handlers once the pipe is broken
512     dest.removeListener('close', onclose);
513     dest.removeListener('finish', onfinish);
514     dest.removeListener('drain', ondrain);
515     dest.removeListener('error', onerror);
516     dest.removeListener('unpipe', onunpipe);
517     src.removeListener('end', onend);
518     src.removeListener('end', cleanup);
519     src.removeListener('data', ondata);
520
521     cleanedUp = true;
522
523     // if the reader is waiting for a drain event from this
524     // specific writer, then it would cause it to never start
525     // flowing again.
526     // So, if this is awaiting a drain, then we just call it now.
527     // If we don't know, then assume that we are waiting for one.
528     if (state.awaitDrain &&
529         (!dest._writableState || dest._writableState.needDrain))
530       ondrain();
531   }
532
533   src.on('data', ondata);
534   function ondata(chunk) {
535     debug('ondata');
536     var ret = dest.write(chunk);
537     if (false === ret) {
538       // If the user unpiped during `dest.write()`, it is possible
539       // to get stuck in a permanently paused state if that write
540       // also returned false.
541       if (state.pipesCount === 1 &&
542           state.pipes[0] === dest &&
543           src.listenerCount('data') === 1 &&
544           !cleanedUp) {
545         debug('false write response, pause', src._readableState.awaitDrain);
546         src._readableState.awaitDrain++;
547       }
548       src.pause();
549     }
550   }
551
552   // if the dest has an error, then stop piping into it.
553   // however, don't suppress the throwing behavior for this.
554   function onerror(er) {
555     debug('onerror', er);
556     unpipe();
557     dest.removeListener('error', onerror);
558     if (EE.listenerCount(dest, 'error') === 0)
559       dest.emit('error', er);
560   }
561   // This is a brutally ugly hack to make sure that our error handler
562   // is attached before any userland ones.  NEVER DO THIS.
563   if (!dest._events || !dest._events.error)
564     dest.on('error', onerror);
565   else if (Array.isArray(dest._events.error))
566     dest._events.error.unshift(onerror);
567   else
568     dest._events.error = [onerror, dest._events.error];
569
570
571   // Both close and finish should trigger unpipe, but only once.
572   function onclose() {
573     dest.removeListener('finish', onfinish);
574     unpipe();
575   }
576   dest.once('close', onclose);
577   function onfinish() {
578     debug('onfinish');
579     dest.removeListener('close', onclose);
580     unpipe();
581   }
582   dest.once('finish', onfinish);
583
584   function unpipe() {
585     debug('unpipe');
586     src.unpipe(dest);
587   }
588
589   // tell the dest that it's being piped to
590   dest.emit('pipe', src);
591
592   // start the flow if it hasn't been started already.
593   if (!state.flowing) {
594     debug('pipe resume');
595     src.resume();
596   }
597
598   return dest;
599 };
600
601 function pipeOnDrain(src) {
602   return function() {
603     var state = src._readableState;
604     debug('pipeOnDrain', state.awaitDrain);
605     if (state.awaitDrain)
606       state.awaitDrain--;
607     if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
608       state.flowing = true;
609       flow(src);
610     }
611   };
612 }
613
614
615 Readable.prototype.unpipe = function(dest) {
616   var state = this._readableState;
617
618   // if we're not piping anywhere, then do nothing.
619   if (state.pipesCount === 0)
620     return this;
621
622   // just one destination.  most common case.
623   if (state.pipesCount === 1) {
624     // passed in one, but it's not the right one.
625     if (dest && dest !== state.pipes)
626       return this;
627
628     if (!dest)
629       dest = state.pipes;
630
631     // got a match.
632     state.pipes = null;
633     state.pipesCount = 0;
634     state.flowing = false;
635     if (dest)
636       dest.emit('unpipe', this);
637     return this;
638   }
639
640   // slow case. multiple pipe destinations.
641
642   if (!dest) {
643     // remove all.
644     var dests = state.pipes;
645     var len = state.pipesCount;
646     state.pipes = null;
647     state.pipesCount = 0;
648     state.flowing = false;
649
650     for (let i = 0; i < len; i++)
651       dests[i].emit('unpipe', this);
652     return this;
653   }
654
655   // try to find the right one.
656   const i = state.pipes.indexOf(dest);
657   if (i === -1)
658     return this;
659
660   state.pipes.splice(i, 1);
661   state.pipesCount -= 1;
662   if (state.pipesCount === 1)
663     state.pipes = state.pipes[0];
664
665   dest.emit('unpipe', this);
666
667   return this;
668 };
669
670 // set up data events if they are asked for
671 // Ensure readable listeners eventually get something
672 Readable.prototype.on = function(ev, fn) {
673   var res = Stream.prototype.on.call(this, ev, fn);
674
675   // If listening to data, and it has not explicitly been paused,
676   // then call resume to start the flow of data on the next tick.
677   if (ev === 'data' && false !== this._readableState.flowing) {
678     this.resume();
679   }
680
681   if (ev === 'readable' && this.readable) {
682     var state = this._readableState;
683     if (!state.readableListening) {
684       state.readableListening = true;
685       state.emittedReadable = false;
686       state.needReadable = true;
687       if (!state.reading) {
688         process.nextTick(nReadingNextTick, this);
689       } else if (state.length) {
690         emitReadable(this, state);
691       }
692     }
693   }
694
695   return res;
696 };
697 Readable.prototype.addListener = Readable.prototype.on;
698
699 function nReadingNextTick(self) {
700   debug('readable nexttick read 0');
701   self.read(0);
702 }
703
704 // pause() and resume() are remnants of the legacy readable stream API
705 // If the user uses them, then switch into old mode.
706 Readable.prototype.resume = function() {
707   var state = this._readableState;
708   if (!state.flowing) {
709     debug('resume');
710     state.flowing = true;
711     resume(this, state);
712   }
713   return this;
714 };
715
716 function resume(stream, state) {
717   if (!state.resumeScheduled) {
718     state.resumeScheduled = true;
719     process.nextTick(resume_, stream, state);
720   }
721 }
722
723 function resume_(stream, state) {
724   if (!state.reading) {
725     debug('resume read 0');
726     stream.read(0);
727   }
728
729   state.resumeScheduled = false;
730   stream.emit('resume');
731   flow(stream);
732   if (state.flowing && !state.reading)
733     stream.read(0);
734 }
735
736 Readable.prototype.pause = function() {
737   debug('call pause flowing=%j', this._readableState.flowing);
738   if (false !== this._readableState.flowing) {
739     debug('pause');
740     this._readableState.flowing = false;
741     this.emit('pause');
742   }
743   return this;
744 };
745
746 function flow(stream) {
747   var state = stream._readableState;
748   debug('flow', state.flowing);
749   if (state.flowing) {
750     do {
751       var chunk = stream.read();
752     } while (null !== chunk && state.flowing);
753   }
754 }
755
756 // wrap an old-style stream as the async data source.
757 // This is *not* part of the readable stream interface.
758 // It is an ugly unfortunate mess of history.
759 Readable.prototype.wrap = function(stream) {
760   var state = this._readableState;
761   var paused = false;
762
763   var self = this;
764   stream.on('end', function() {
765     debug('wrapped end');
766     if (state.decoder && !state.ended) {
767       var chunk = state.decoder.end();
768       if (chunk && chunk.length)
769         self.push(chunk);
770     }
771
772     self.push(null);
773   });
774
775   stream.on('data', function(chunk) {
776     debug('wrapped data');
777     if (state.decoder)
778       chunk = state.decoder.write(chunk);
779
780     // don't skip over falsy values in objectMode
781     if (state.objectMode && (chunk === null || chunk === undefined))
782       return;
783     else if (!state.objectMode && (!chunk || !chunk.length))
784       return;
785
786     var ret = self.push(chunk);
787     if (!ret) {
788       paused = true;
789       stream.pause();
790     }
791   });
792
793   // proxy all the other methods.
794   // important when wrapping filters and duplexes.
795   for (var i in stream) {
796     if (this[i] === undefined && typeof stream[i] === 'function') {
797       this[i] = function(method) { return function() {
798         return stream[method].apply(stream, arguments);
799       }; }(i);
800     }
801   }
802
803   // proxy certain important events.
804   const events = ['error', 'close', 'destroy', 'pause', 'resume'];
805   events.forEach(function(ev) {
806     stream.on(ev, self.emit.bind(self, ev));
807   });
808
809   // when we try to consume some more bytes, simply unpause the
810   // underlying stream.
811   self._read = function(n) {
812     debug('wrapped _read', n);
813     if (paused) {
814       paused = false;
815       stream.resume();
816     }
817   };
818
819   return self;
820 };
821
822
823 // exposed for testing purposes only.
824 Readable._fromList = fromList;
825
826 // Pluck off n bytes from an array of buffers.
827 // Length is the combined lengths of all the buffers in the list.
828 function fromList(n, state) {
829   var list = state.buffer;
830   var length = state.length;
831   var stringMode = !!state.decoder;
832   var objectMode = !!state.objectMode;
833   var ret;
834
835   // nothing in the list, definitely empty.
836   if (list.length === 0)
837     return null;
838
839   if (length === 0)
840     ret = null;
841   else if (objectMode)
842     ret = list.shift();
843   else if (!n || n >= length) {
844     // read it all, truncate the array.
845     if (stringMode)
846       ret = list.join('');
847     else if (list.length === 1)
848       ret = list[0];
849     else
850       ret = Buffer.concat(list, length);
851     list.length = 0;
852   } else {
853     // read just some of it.
854     if (n < list[0].length) {
855       // just take a part of the first list item.
856       // slice is the same for buffers and strings.
857       const buf = list[0];
858       ret = buf.slice(0, n);
859       list[0] = buf.slice(n);
860     } else if (n === list[0].length) {
861       // first list is a perfect match
862       ret = list.shift();
863     } else {
864       // complex case.
865       // we have enough to cover it, but it spans past the first buffer.
866       if (stringMode)
867         ret = '';
868       else
869         ret = new Buffer(n);
870
871       var c = 0;
872       for (var i = 0, l = list.length; i < l && c < n; i++) {
873         const buf = list[0];
874         var cpy = Math.min(n - c, buf.length);
875
876         if (stringMode)
877           ret += buf.slice(0, cpy);
878         else
879           buf.copy(ret, c, 0, cpy);
880
881         if (cpy < buf.length)
882           list[0] = buf.slice(cpy);
883         else
884           list.shift();
885
886         c += cpy;
887       }
888     }
889   }
890
891   return ret;
892 }
893
894 function endReadable(stream) {
895   var state = stream._readableState;
896
897   // If we get here before consuming all the bytes, then that is a
898   // bug in node.  Should never happen.
899   if (state.length > 0)
900     throw new Error('endReadable called on non-empty stream');
901
902   if (!state.endEmitted) {
903     state.ended = true;
904     process.nextTick(endReadableNT, state, stream);
905   }
906 }
907
908 function endReadableNT(state, stream) {
909   // Check that we didn't get one last unshift.
910   if (!state.endEmitted && state.length === 0) {
911     state.endEmitted = true;
912     stream.readable = false;
913     stream.emit('end');
914   }
915 }