doc: improvements to console.markdown copy
[platform/upstream/nodejs.git] / lib / _stream_readable.js
1 'use strict';
2
3 module.exports = Readable;
4 Readable.ReadableState = ReadableState;
5
6 const EE = require('events');
7 const Stream = require('stream');
8 const Buffer = require('buffer').Buffer;
9 const util = require('util');
10 const debug = util.debuglog('stream');
11 var StringDecoder;
12
13 util.inherits(Readable, Stream);
14
15 function ReadableState(options, stream) {
16   options = options || {};
17
18   // object stream flag. Used to make read(n) ignore n and to
19   // make all the buffer merging and length checks go away
20   this.objectMode = !!options.objectMode;
21
22   if (stream instanceof Stream.Duplex)
23     this.objectMode = this.objectMode || !!options.readableObjectMode;
24
25   // the point at which it stops calling _read() to fill the buffer
26   // Note: 0 is a valid value, means "don't call _read preemptively ever"
27   var hwm = options.highWaterMark;
28   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
29   this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
30
31   // cast to ints.
32   this.highWaterMark = ~~this.highWaterMark;
33
34   this.buffer = [];
35   this.length = 0;
36   this.pipes = null;
37   this.pipesCount = 0;
38   this.flowing = null;
39   this.ended = false;
40   this.endEmitted = false;
41   this.reading = false;
42
43   // a flag to be able to tell if the onwrite cb is called immediately,
44   // or on a later tick.  We set this to true at first, because any
45   // actions that shouldn't happen until "later" should generally also
46   // not happen before the first write call.
47   this.sync = true;
48
49   // whenever we return null, then we set a flag to say
50   // that we're awaiting a 'readable' event emission.
51   this.needReadable = false;
52   this.emittedReadable = false;
53   this.readableListening = false;
54
55   // Crypto is kind of old and crusty.  Historically, its default string
56   // encoding is 'binary' so we have to make this configurable.
57   // Everything else in the universe uses 'utf8', though.
58   this.defaultEncoding = options.defaultEncoding || 'utf8';
59
60   // when piping, we only care about 'readable' events that happen
61   // after read()ing all the bytes and not getting any pushback.
62   this.ranOut = false;
63
64   // the number of writers that are awaiting a drain event in .pipe()s
65   this.awaitDrain = 0;
66
67   // if true, a maybeReadMore has been scheduled
68   this.readingMore = false;
69
70   this.decoder = null;
71   this.encoding = null;
72   if (options.encoding) {
73     if (!StringDecoder)
74       StringDecoder = require('string_decoder').StringDecoder;
75     this.decoder = new StringDecoder(options.encoding);
76     this.encoding = options.encoding;
77   }
78 }
79
80 function Readable(options) {
81   if (!(this instanceof Readable))
82     return new Readable(options);
83
84   this._readableState = new ReadableState(options, this);
85
86   // legacy
87   this.readable = true;
88
89   if (options && typeof options.read === 'function')
90     this._read = options.read;
91
92   Stream.call(this);
93 }
94
95 // Manually shove something into the read() buffer.
96 // This returns true if the highWaterMark has not been hit yet,
97 // similar to how Writable.write() returns true if you should
98 // write() some more.
99 Readable.prototype.push = function(chunk, encoding) {
100   var state = this._readableState;
101
102   if (!state.objectMode && typeof chunk === 'string') {
103     encoding = encoding || state.defaultEncoding;
104     if (encoding !== state.encoding) {
105       chunk = new Buffer(chunk, encoding);
106       encoding = '';
107     }
108   }
109
110   return readableAddChunk(this, state, chunk, encoding, false);
111 };
112
113 // Unshift should *always* be something directly out of read()
114 Readable.prototype.unshift = function(chunk) {
115   var state = this._readableState;
116   return readableAddChunk(this, state, chunk, '', true);
117 };
118
119 Readable.prototype.isPaused = function() {
120   return this._readableState.flowing === false;
121 };
122
123 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
124   var er = chunkInvalid(state, chunk);
125   if (er) {
126     stream.emit('error', er);
127   } else if (chunk === null) {
128     state.reading = false;
129     onEofChunk(stream, state);
130   } else if (state.objectMode || chunk && chunk.length > 0) {
131     if (state.ended && !addToFront) {
132       var e = new Error('stream.push() after EOF');
133       stream.emit('error', e);
134     } else if (state.endEmitted && addToFront) {
135       var e = new Error('stream.unshift() after end event');
136       stream.emit('error', e);
137     } else {
138       if (state.decoder && !addToFront && !encoding)
139         chunk = state.decoder.write(chunk);
140
141       if (!addToFront)
142         state.reading = false;
143
144       // if we want the data now, just emit it.
145       if (state.flowing && state.length === 0 && !state.sync) {
146         stream.emit('data', chunk);
147         stream.read(0);
148       } else {
149         // update the buffer info.
150         state.length += state.objectMode ? 1 : chunk.length;
151         if (addToFront)
152           state.buffer.unshift(chunk);
153         else
154           state.buffer.push(chunk);
155
156         if (state.needReadable)
157           emitReadable(stream);
158       }
159
160       maybeReadMore(stream, state);
161     }
162   } else if (!addToFront) {
163     state.reading = false;
164   }
165
166   return needMoreData(state);
167 }
168
169
170 // if it's past the high water mark, we can push in some more.
171 // Also, if we have no data yet, we can stand some
172 // more bytes.  This is to work around cases where hwm=0,
173 // such as the repl.  Also, if the push() triggered a
174 // readable event, and the user called read(largeNumber) such that
175 // needReadable was set, then we ought to push more, so that another
176 // 'readable' event will be triggered.
177 function needMoreData(state) {
178   return !state.ended &&
179          (state.needReadable ||
180           state.length < state.highWaterMark ||
181           state.length === 0);
182 }
183
184 // backwards compatibility.
185 Readable.prototype.setEncoding = function(enc) {
186   if (!StringDecoder)
187     StringDecoder = require('string_decoder').StringDecoder;
188   this._readableState.decoder = new StringDecoder(enc);
189   this._readableState.encoding = enc;
190   return this;
191 };
192
193 // Don't raise the hwm > 8MB
194 const MAX_HWM = 0x800000;
195 function computeNewHighWaterMark(n) {
196   if (n >= MAX_HWM) {
197     n = MAX_HWM;
198   } else {
199     // Get the next highest power of 2
200     n--;
201     n |= n >>> 1;
202     n |= n >>> 2;
203     n |= n >>> 4;
204     n |= n >>> 8;
205     n |= n >>> 16;
206     n++;
207   }
208   return n;
209 }
210
211 function howMuchToRead(n, state) {
212   if (state.length === 0 && state.ended)
213     return 0;
214
215   if (state.objectMode)
216     return n === 0 ? 0 : 1;
217
218   if (n === null || isNaN(n)) {
219     // only flow one buffer at a time
220     if (state.flowing && state.buffer.length)
221       return state.buffer[0].length;
222     else
223       return state.length;
224   }
225
226   if (n <= 0)
227     return 0;
228
229   // If we're asking for more than the target buffer level,
230   // then raise the water mark.  Bump up to the next highest
231   // power of 2, to prevent increasing it excessively in tiny
232   // amounts.
233   if (n > state.highWaterMark)
234     state.highWaterMark = computeNewHighWaterMark(n);
235
236   // don't have that much.  return null, unless we've ended.
237   if (n > state.length) {
238     if (!state.ended) {
239       state.needReadable = true;
240       return 0;
241     } else {
242       return state.length;
243     }
244   }
245
246   return n;
247 }
248
249 // you can override either this method, or the async _read(n) below.
250 Readable.prototype.read = function(n) {
251   debug('read', n);
252   var state = this._readableState;
253   var nOrig = n;
254
255   if (typeof n !== 'number' || n > 0)
256     state.emittedReadable = false;
257
258   // if we're doing read(0) to trigger a readable event, but we
259   // already have a bunch of data in the buffer, then just trigger
260   // the 'readable' event and move on.
261   if (n === 0 &&
262       state.needReadable &&
263       (state.length >= state.highWaterMark || state.ended)) {
264     debug('read: emitReadable', state.length, state.ended);
265     if (state.length === 0 && state.ended)
266       endReadable(this);
267     else
268       emitReadable(this);
269     return null;
270   }
271
272   n = howMuchToRead(n, state);
273
274   // if we've ended, and we're now clear, then finish it up.
275   if (n === 0 && state.ended) {
276     if (state.length === 0)
277       endReadable(this);
278     return null;
279   }
280
281   // All the actual chunk generation logic needs to be
282   // *below* the call to _read.  The reason is that in certain
283   // synthetic stream cases, such as passthrough streams, _read
284   // may be a completely synchronous operation which may change
285   // the state of the read buffer, providing enough data when
286   // before there was *not* enough.
287   //
288   // So, the steps are:
289   // 1. Figure out what the state of things will be after we do
290   // a read from the buffer.
291   //
292   // 2. If that resulting state will trigger a _read, then call _read.
293   // Note that this may be asynchronous, or synchronous.  Yes, it is
294   // deeply ugly to write APIs this way, but that still doesn't mean
295   // that the Readable class should behave improperly, as streams are
296   // designed to be sync/async agnostic.
297   // Take note if the _read call is sync or async (ie, if the read call
298   // has returned yet), so that we know whether or not it's safe to emit
299   // 'readable' etc.
300   //
301   // 3. Actually pull the requested chunks out of the buffer and return.
302
303   // if we need a readable event, then we need to do some reading.
304   var doRead = state.needReadable;
305   debug('need readable', doRead);
306
307   // if we currently have less than the highWaterMark, then also read some
308   if (state.length === 0 || state.length - n < state.highWaterMark) {
309     doRead = true;
310     debug('length less than watermark', doRead);
311   }
312
313   // however, if we've ended, then there's no point, and if we're already
314   // reading, then it's unnecessary.
315   if (state.ended || state.reading) {
316     doRead = false;
317     debug('reading or ended', doRead);
318   }
319
320   if (doRead) {
321     debug('do read');
322     state.reading = true;
323     state.sync = true;
324     // if the length is currently zero, then we *need* a readable event.
325     if (state.length === 0)
326       state.needReadable = true;
327     // call internal read method
328     this._read(state.highWaterMark);
329     state.sync = false;
330   }
331
332   // If _read pushed data synchronously, then `reading` will be false,
333   // and we need to re-evaluate how much data we can return to the user.
334   if (doRead && !state.reading)
335     n = howMuchToRead(nOrig, state);
336
337   var ret;
338   if (n > 0)
339     ret = fromList(n, state);
340   else
341     ret = null;
342
343   if (ret === null) {
344     state.needReadable = true;
345     n = 0;
346   }
347
348   state.length -= n;
349
350   // If we have nothing in the buffer, then we want to know
351   // as soon as we *do* get something into the buffer.
352   if (state.length === 0 && !state.ended)
353     state.needReadable = true;
354
355   // If we tried to read() past the EOF, then emit end on the next tick.
356   if (nOrig !== n && state.ended && state.length === 0)
357     endReadable(this);
358
359   if (ret !== null)
360     this.emit('data', ret);
361
362   return ret;
363 };
364
365 function chunkInvalid(state, chunk) {
366   var er = null;
367   if (!(chunk instanceof Buffer) &&
368       typeof chunk !== 'string' &&
369       chunk !== null &&
370       chunk !== undefined &&
371       !state.objectMode) {
372     er = new TypeError('Invalid non-string/buffer chunk');
373   }
374   return er;
375 }
376
377
378 function onEofChunk(stream, state) {
379   if (state.ended) return;
380   if (state.decoder) {
381     var chunk = state.decoder.end();
382     if (chunk && chunk.length) {
383       state.buffer.push(chunk);
384       state.length += state.objectMode ? 1 : chunk.length;
385     }
386   }
387   state.ended = true;
388
389   // emit 'readable' now to make sure it gets picked up.
390   emitReadable(stream);
391 }
392
393 // Don't emit readable right away in sync mode, because this can trigger
394 // another read() call => stack overflow.  This way, it might trigger
395 // a nextTick recursion warning, but that's not so bad.
396 function emitReadable(stream) {
397   var state = stream._readableState;
398   state.needReadable = false;
399   if (!state.emittedReadable) {
400     debug('emitReadable', state.flowing);
401     state.emittedReadable = true;
402     if (state.sync)
403       process.nextTick(emitReadable_, stream);
404     else
405       emitReadable_(stream);
406   }
407 }
408
409 function emitReadable_(stream) {
410   debug('emit readable');
411   stream.emit('readable');
412   flow(stream);
413 }
414
415
416 // at this point, the user has presumably seen the 'readable' event,
417 // and called read() to consume some data.  that may have triggered
418 // in turn another _read(n) call, in which case reading = true if
419 // it's in progress.
420 // However, if we're not ended, or reading, and the length < hwm,
421 // then go ahead and try to read some more preemptively.
422 function maybeReadMore(stream, state) {
423   if (!state.readingMore) {
424     state.readingMore = true;
425     process.nextTick(maybeReadMore_, stream, state);
426   }
427 }
428
429 function maybeReadMore_(stream, state) {
430   var len = state.length;
431   while (!state.reading && !state.flowing && !state.ended &&
432          state.length < state.highWaterMark) {
433     debug('maybeReadMore read 0');
434     stream.read(0);
435     if (len === state.length)
436       // didn't get any data, stop spinning.
437       break;
438     else
439       len = state.length;
440   }
441   state.readingMore = false;
442 }
443
444 // abstract method.  to be overridden in specific implementation classes.
445 // call cb(er, data) where data is <= n in length.
446 // for virtual (non-string, non-buffer) streams, "length" is somewhat
447 // arbitrary, and perhaps not very meaningful.
448 Readable.prototype._read = function(n) {
449   this.emit('error', new Error('not implemented'));
450 };
451
452 Readable.prototype.pipe = function(dest, pipeOpts) {
453   var src = this;
454   var state = this._readableState;
455
456   switch (state.pipesCount) {
457     case 0:
458       state.pipes = dest;
459       break;
460     case 1:
461       state.pipes = [state.pipes, dest];
462       break;
463     default:
464       state.pipes.push(dest);
465       break;
466   }
467   state.pipesCount += 1;
468   debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
469
470   var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
471               dest !== process.stdout &&
472               dest !== process.stderr;
473
474   var endFn = doEnd ? onend : cleanup;
475   if (state.endEmitted)
476     process.nextTick(endFn);
477   else
478     src.once('end', endFn);
479
480   dest.on('unpipe', onunpipe);
481   function onunpipe(readable) {
482     debug('onunpipe');
483     if (readable === src) {
484       cleanup();
485     }
486   }
487
488   function onend() {
489     debug('onend');
490     dest.end();
491   }
492
493   // when the dest drains, it reduces the awaitDrain counter
494   // on the source.  This would be more elegant with a .once()
495   // handler in flow(), but adding and removing repeatedly is
496   // too slow.
497   var ondrain = pipeOnDrain(src);
498   dest.on('drain', ondrain);
499
500   var cleanedUp = false;
501   function cleanup() {
502     debug('cleanup');
503     // cleanup event handlers once the pipe is broken
504     dest.removeListener('close', onclose);
505     dest.removeListener('finish', onfinish);
506     dest.removeListener('drain', ondrain);
507     dest.removeListener('error', onerror);
508     dest.removeListener('unpipe', onunpipe);
509     src.removeListener('end', onend);
510     src.removeListener('end', cleanup);
511     src.removeListener('data', ondata);
512
513     cleanedUp = true;
514
515     // if the reader is waiting for a drain event from this
516     // specific writer, then it would cause it to never start
517     // flowing again.
518     // So, if this is awaiting a drain, then we just call it now.
519     // If we don't know, then assume that we are waiting for one.
520     if (state.awaitDrain &&
521         (!dest._writableState || dest._writableState.needDrain))
522       ondrain();
523   }
524
525   src.on('data', ondata);
526   function ondata(chunk) {
527     debug('ondata');
528     var ret = dest.write(chunk);
529     if (false === ret) {
530       // If the user unpiped during `dest.write()`, it is possible
531       // to get stuck in a permanently paused state if that write
532       // also returned false.
533       if (state.pipesCount === 1 &&
534           state.pipes[0] === dest &&
535           src.listenerCount('data') === 1 &&
536           !cleanedUp) {
537         debug('false write response, pause', src._readableState.awaitDrain);
538         src._readableState.awaitDrain++;
539       }
540       src.pause();
541     }
542   }
543
544   // if the dest has an error, then stop piping into it.
545   // however, don't suppress the throwing behavior for this.
546   function onerror(er) {
547     debug('onerror', er);
548     unpipe();
549     dest.removeListener('error', onerror);
550     if (EE.listenerCount(dest, 'error') === 0)
551       dest.emit('error', er);
552   }
553   // This is a brutally ugly hack to make sure that our error handler
554   // is attached before any userland ones.  NEVER DO THIS.
555   if (!dest._events || !dest._events.error)
556     dest.on('error', onerror);
557   else if (Array.isArray(dest._events.error))
558     dest._events.error.unshift(onerror);
559   else
560     dest._events.error = [onerror, dest._events.error];
561
562
563   // Both close and finish should trigger unpipe, but only once.
564   function onclose() {
565     dest.removeListener('finish', onfinish);
566     unpipe();
567   }
568   dest.once('close', onclose);
569   function onfinish() {
570     debug('onfinish');
571     dest.removeListener('close', onclose);
572     unpipe();
573   }
574   dest.once('finish', onfinish);
575
576   function unpipe() {
577     debug('unpipe');
578     src.unpipe(dest);
579   }
580
581   // tell the dest that it's being piped to
582   dest.emit('pipe', src);
583
584   // start the flow if it hasn't been started already.
585   if (!state.flowing) {
586     debug('pipe resume');
587     src.resume();
588   }
589
590   return dest;
591 };
592
593 function pipeOnDrain(src) {
594   return function() {
595     var state = src._readableState;
596     debug('pipeOnDrain', state.awaitDrain);
597     if (state.awaitDrain)
598       state.awaitDrain--;
599     if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
600       state.flowing = true;
601       flow(src);
602     }
603   };
604 }
605
606
607 Readable.prototype.unpipe = function(dest) {
608   var state = this._readableState;
609
610   // if we're not piping anywhere, then do nothing.
611   if (state.pipesCount === 0)
612     return this;
613
614   // just one destination.  most common case.
615   if (state.pipesCount === 1) {
616     // passed in one, but it's not the right one.
617     if (dest && dest !== state.pipes)
618       return this;
619
620     if (!dest)
621       dest = state.pipes;
622
623     // got a match.
624     state.pipes = null;
625     state.pipesCount = 0;
626     state.flowing = false;
627     if (dest)
628       dest.emit('unpipe', this);
629     return this;
630   }
631
632   // slow case. multiple pipe destinations.
633
634   if (!dest) {
635     // remove all.
636     var dests = state.pipes;
637     var len = state.pipesCount;
638     state.pipes = null;
639     state.pipesCount = 0;
640     state.flowing = false;
641
642     for (var i = 0; i < len; i++)
643       dests[i].emit('unpipe', this);
644     return this;
645   }
646
647   // try to find the right one.
648   var i = state.pipes.indexOf(dest);
649   if (i === -1)
650     return this;
651
652   state.pipes.splice(i, 1);
653   state.pipesCount -= 1;
654   if (state.pipesCount === 1)
655     state.pipes = state.pipes[0];
656
657   dest.emit('unpipe', this);
658
659   return this;
660 };
661
662 // set up data events if they are asked for
663 // Ensure readable listeners eventually get something
664 Readable.prototype.on = function(ev, fn) {
665   var res = Stream.prototype.on.call(this, ev, fn);
666
667   // If listening to data, and it has not explicitly been paused,
668   // then call resume to start the flow of data on the next tick.
669   if (ev === 'data' && false !== this._readableState.flowing) {
670     this.resume();
671   }
672
673   if (ev === 'readable' && this.readable) {
674     var state = this._readableState;
675     if (!state.readableListening) {
676       state.readableListening = true;
677       state.emittedReadable = false;
678       state.needReadable = true;
679       if (!state.reading) {
680         process.nextTick(nReadingNextTick, this);
681       } else if (state.length) {
682         emitReadable(this, state);
683       }
684     }
685   }
686
687   return res;
688 };
689 Readable.prototype.addListener = Readable.prototype.on;
690
691 function nReadingNextTick(self) {
692   debug('readable nexttick read 0');
693   self.read(0);
694 }
695
696 // pause() and resume() are remnants of the legacy readable stream API
697 // If the user uses them, then switch into old mode.
698 Readable.prototype.resume = function() {
699   var state = this._readableState;
700   if (!state.flowing) {
701     debug('resume');
702     state.flowing = true;
703     resume(this, state);
704   }
705   return this;
706 };
707
708 function resume(stream, state) {
709   if (!state.resumeScheduled) {
710     state.resumeScheduled = true;
711     process.nextTick(resume_, stream, state);
712   }
713 }
714
715 function resume_(stream, state) {
716   if (!state.reading) {
717     debug('resume read 0');
718     stream.read(0);
719   }
720
721   state.resumeScheduled = false;
722   stream.emit('resume');
723   flow(stream);
724   if (state.flowing && !state.reading)
725     stream.read(0);
726 }
727
728 Readable.prototype.pause = function() {
729   debug('call pause flowing=%j', this._readableState.flowing);
730   if (false !== this._readableState.flowing) {
731     debug('pause');
732     this._readableState.flowing = false;
733     this.emit('pause');
734   }
735   return this;
736 };
737
738 function flow(stream) {
739   var state = stream._readableState;
740   debug('flow', state.flowing);
741   if (state.flowing) {
742     do {
743       var chunk = stream.read();
744     } while (null !== chunk && state.flowing);
745   }
746 }
747
748 // wrap an old-style stream as the async data source.
749 // This is *not* part of the readable stream interface.
750 // It is an ugly unfortunate mess of history.
751 Readable.prototype.wrap = function(stream) {
752   var state = this._readableState;
753   var paused = false;
754
755   var self = this;
756   stream.on('end', function() {
757     debug('wrapped end');
758     if (state.decoder && !state.ended) {
759       var chunk = state.decoder.end();
760       if (chunk && chunk.length)
761         self.push(chunk);
762     }
763
764     self.push(null);
765   });
766
767   stream.on('data', function(chunk) {
768     debug('wrapped data');
769     if (state.decoder)
770       chunk = state.decoder.write(chunk);
771
772     // don't skip over falsy values in objectMode
773     if (state.objectMode && (chunk === null || chunk === undefined))
774       return;
775     else if (!state.objectMode && (!chunk || !chunk.length))
776       return;
777
778     var ret = self.push(chunk);
779     if (!ret) {
780       paused = true;
781       stream.pause();
782     }
783   });
784
785   // proxy all the other methods.
786   // important when wrapping filters and duplexes.
787   for (var i in stream) {
788     if (this[i] === undefined && typeof stream[i] === 'function') {
789       this[i] = function(method) { return function() {
790         return stream[method].apply(stream, arguments);
791       }; }(i);
792     }
793   }
794
795   // proxy certain important events.
796   const events = ['error', 'close', 'destroy', 'pause', 'resume'];
797   events.forEach(function(ev) {
798     stream.on(ev, self.emit.bind(self, ev));
799   });
800
801   // when we try to consume some more bytes, simply unpause the
802   // underlying stream.
803   self._read = function(n) {
804     debug('wrapped _read', n);
805     if (paused) {
806       paused = false;
807       stream.resume();
808     }
809   };
810
811   return self;
812 };
813
814
815 // exposed for testing purposes only.
816 Readable._fromList = fromList;
817
818 // Pluck off n bytes from an array of buffers.
819 // Length is the combined lengths of all the buffers in the list.
820 function fromList(n, state) {
821   var list = state.buffer;
822   var length = state.length;
823   var stringMode = !!state.decoder;
824   var objectMode = !!state.objectMode;
825   var ret;
826
827   // nothing in the list, definitely empty.
828   if (list.length === 0)
829     return null;
830
831   if (length === 0)
832     ret = null;
833   else if (objectMode)
834     ret = list.shift();
835   else if (!n || n >= length) {
836     // read it all, truncate the array.
837     if (stringMode)
838       ret = list.join('');
839     else if (list.length === 1)
840       ret = list[0];
841     else
842       ret = Buffer.concat(list, length);
843     list.length = 0;
844   } else {
845     // read just some of it.
846     if (n < list[0].length) {
847       // just take a part of the first list item.
848       // slice is the same for buffers and strings.
849       var buf = list[0];
850       ret = buf.slice(0, n);
851       list[0] = buf.slice(n);
852     } else if (n === list[0].length) {
853       // first list is a perfect match
854       ret = list.shift();
855     } else {
856       // complex case.
857       // we have enough to cover it, but it spans past the first buffer.
858       if (stringMode)
859         ret = '';
860       else
861         ret = new Buffer(n);
862
863       var c = 0;
864       for (var i = 0, l = list.length; i < l && c < n; i++) {
865         var buf = list[0];
866         var cpy = Math.min(n - c, buf.length);
867
868         if (stringMode)
869           ret += buf.slice(0, cpy);
870         else
871           buf.copy(ret, c, 0, cpy);
872
873         if (cpy < buf.length)
874           list[0] = buf.slice(cpy);
875         else
876           list.shift();
877
878         c += cpy;
879       }
880     }
881   }
882
883   return ret;
884 }
885
886 function endReadable(stream) {
887   var state = stream._readableState;
888
889   // If we get here before consuming all the bytes, then that is a
890   // bug in node.  Should never happen.
891   if (state.length > 0)
892     throw new Error('endReadable called on non-empty stream');
893
894   if (!state.endEmitted) {
895     state.ended = true;
896     process.nextTick(endReadableNT, state, stream);
897   }
898 }
899
900 function endReadableNT(state, stream) {
901   // Check that we didn't get one last unshift.
902   if (!state.endEmitted && state.length === 0) {
903     state.endEmitted = true;
904     stream.readable = false;
905     stream.emit('end');
906   }
907 }