lib: turn on strict mode
[platform/upstream/nodejs.git] / lib / _stream_readable.js
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 'use strict';
23
24 module.exports = Readable;
25 Readable.ReadableState = ReadableState;
26
27 var EE = require('events').EventEmitter;
28 var Stream = require('stream');
29 var util = require('util');
30 var StringDecoder;
31 var debug = util.debuglog('stream');
32
33 util.inherits(Readable, Stream);
34
35 function ReadableState(options, stream) {
36   options = options || {};
37
38   // object stream flag. Used to make read(n) ignore n and to
39   // make all the buffer merging and length checks go away
40   this.objectMode = !!options.objectMode;
41
42   if (stream instanceof Stream.Duplex)
43     this.objectMode = this.objectMode || !!options.readableObjectMode;
44
45   // the point at which it stops calling _read() to fill the buffer
46   // Note: 0 is a valid value, means "don't call _read preemptively ever"
47   var hwm = options.highWaterMark;
48   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
49   this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
50
51   // cast to ints.
52   this.highWaterMark = ~~this.highWaterMark;
53
54   this.buffer = [];
55   this.length = 0;
56   this.pipes = null;
57   this.pipesCount = 0;
58   this.flowing = null;
59   this.ended = false;
60   this.endEmitted = false;
61   this.reading = false;
62
63   // a flag to be able to tell if the onwrite cb is called immediately,
64   // or on a later tick.  We set this to true at first, because any
65   // actions that shouldn't happen until "later" should generally also
66   // not happen before the first write call.
67   this.sync = true;
68
69   // whenever we return null, then we set a flag to say
70   // that we're awaiting a 'readable' event emission.
71   this.needReadable = false;
72   this.emittedReadable = false;
73   this.readableListening = false;
74
75   // Crypto is kind of old and crusty.  Historically, its default string
76   // encoding is 'binary' so we have to make this configurable.
77   // Everything else in the universe uses 'utf8', though.
78   this.defaultEncoding = options.defaultEncoding || 'utf8';
79
80   // when piping, we only care about 'readable' events that happen
81   // after read()ing all the bytes and not getting any pushback.
82   this.ranOut = false;
83
84   // the number of writers that are awaiting a drain event in .pipe()s
85   this.awaitDrain = 0;
86
87   // if true, a maybeReadMore has been scheduled
88   this.readingMore = false;
89
90   this.decoder = null;
91   this.encoding = null;
92   if (options.encoding) {
93     if (!StringDecoder)
94       StringDecoder = require('string_decoder').StringDecoder;
95     this.decoder = new StringDecoder(options.encoding);
96     this.encoding = options.encoding;
97   }
98 }
99
100 function Readable(options) {
101   if (!(this instanceof Readable))
102     return new Readable(options);
103
104   this._readableState = new ReadableState(options, this);
105
106   // legacy
107   this.readable = true;
108
109   Stream.call(this);
110 }
111
112 // Manually shove something into the read() buffer.
113 // This returns true if the highWaterMark has not been hit yet,
114 // similar to how Writable.write() returns true if you should
115 // write() some more.
116 Readable.prototype.push = function(chunk, encoding) {
117   var state = this._readableState;
118
119   if (util.isString(chunk) && !state.objectMode) {
120     encoding = encoding || state.defaultEncoding;
121     if (encoding !== state.encoding) {
122       chunk = new Buffer(chunk, encoding);
123       encoding = '';
124     }
125   }
126
127   return readableAddChunk(this, state, chunk, encoding, false);
128 };
129
130 // Unshift should *always* be something directly out of read()
131 Readable.prototype.unshift = function(chunk) {
132   var state = this._readableState;
133   return readableAddChunk(this, state, chunk, '', true);
134 };
135
136 Readable.prototype.isPaused = function() {
137   return this._readableState.flowing === false;
138 };
139
140 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
141   var er = chunkInvalid(state, chunk);
142   if (er) {
143     stream.emit('error', er);
144   } else if (chunk === null) {
145     state.reading = false;
146     if (!state.ended)
147       onEofChunk(stream, state);
148   } else if (state.objectMode || chunk && chunk.length > 0) {
149     if (state.ended && !addToFront) {
150       var e = new Error('stream.push() after EOF');
151       stream.emit('error', e);
152     } else if (state.endEmitted && addToFront) {
153       var e = new Error('stream.unshift() after end event');
154       stream.emit('error', e);
155     } else {
156       if (state.decoder && !addToFront && !encoding)
157         chunk = state.decoder.write(chunk);
158
159       if (!addToFront)
160         state.reading = false;
161
162       // if we want the data now, just emit it.
163       if (state.flowing && state.length === 0 && !state.sync) {
164         stream.emit('data', chunk);
165         stream.read(0);
166       } else {
167         // update the buffer info.
168         state.length += state.objectMode ? 1 : chunk.length;
169         if (addToFront)
170           state.buffer.unshift(chunk);
171         else
172           state.buffer.push(chunk);
173
174         if (state.needReadable)
175           emitReadable(stream);
176       }
177
178       maybeReadMore(stream, state);
179     }
180   } else if (!addToFront) {
181     state.reading = false;
182   }
183
184   return needMoreData(state);
185 }
186
187
188
189 // if it's past the high water mark, we can push in some more.
190 // Also, if we have no data yet, we can stand some
191 // more bytes.  This is to work around cases where hwm=0,
192 // such as the repl.  Also, if the push() triggered a
193 // readable event, and the user called read(largeNumber) such that
194 // needReadable was set, then we ought to push more, so that another
195 // 'readable' event will be triggered.
196 function needMoreData(state) {
197   return !state.ended &&
198          (state.needReadable ||
199           state.length < state.highWaterMark ||
200           state.length === 0);
201 }
202
203 // backwards compatibility.
204 Readable.prototype.setEncoding = function(enc) {
205   if (!StringDecoder)
206     StringDecoder = require('string_decoder').StringDecoder;
207   this._readableState.decoder = new StringDecoder(enc);
208   this._readableState.encoding = enc;
209   return this;
210 };
211
212 // Don't raise the hwm > 128MB
213 var MAX_HWM = 0x800000;
214 function roundUpToNextPowerOf2(n) {
215   if (n >= MAX_HWM) {
216     n = MAX_HWM;
217   } else {
218     // Get the next highest power of 2
219     n--;
220     for (var p = 1; p < 32; p <<= 1) n |= n >> p;
221     n++;
222   }
223   return n;
224 }
225
226 function howMuchToRead(n, state) {
227   if (state.length === 0 && state.ended)
228     return 0;
229
230   if (state.objectMode)
231     return n === 0 ? 0 : 1;
232
233   if (util.isNull(n) || isNaN(n)) {
234     // only flow one buffer at a time
235     if (state.flowing && state.buffer.length)
236       return state.buffer[0].length;
237     else
238       return state.length;
239   }
240
241   if (n <= 0)
242     return 0;
243
244   // If we're asking for more than the target buffer level,
245   // then raise the water mark.  Bump up to the next highest
246   // power of 2, to prevent increasing it excessively in tiny
247   // amounts.
248   if (n > state.highWaterMark)
249     state.highWaterMark = roundUpToNextPowerOf2(n);
250
251   // don't have that much.  return null, unless we've ended.
252   if (n > state.length) {
253     if (!state.ended) {
254       state.needReadable = true;
255       return 0;
256     } else
257       return state.length;
258   }
259
260   return n;
261 }
262
263 // you can override either this method, or the async _read(n) below.
264 Readable.prototype.read = function(n) {
265   debug('read', n);
266   var state = this._readableState;
267   var nOrig = n;
268
269   if (!util.isNumber(n) || n > 0)
270     state.emittedReadable = false;
271
272   // if we're doing read(0) to trigger a readable event, but we
273   // already have a bunch of data in the buffer, then just trigger
274   // the 'readable' event and move on.
275   if (n === 0 &&
276       state.needReadable &&
277       (state.length >= state.highWaterMark || state.ended)) {
278     debug('read: emitReadable', state.length, state.ended);
279     if (state.length === 0 && state.ended)
280       endReadable(this);
281     else
282       emitReadable(this);
283     return null;
284   }
285
286   n = howMuchToRead(n, state);
287
288   // if we've ended, and we're now clear, then finish it up.
289   if (n === 0 && state.ended) {
290     if (state.length === 0)
291       endReadable(this);
292     return null;
293   }
294
295   // All the actual chunk generation logic needs to be
296   // *below* the call to _read.  The reason is that in certain
297   // synthetic stream cases, such as passthrough streams, _read
298   // may be a completely synchronous operation which may change
299   // the state of the read buffer, providing enough data when
300   // before there was *not* enough.
301   //
302   // So, the steps are:
303   // 1. Figure out what the state of things will be after we do
304   // a read from the buffer.
305   //
306   // 2. If that resulting state will trigger a _read, then call _read.
307   // Note that this may be asynchronous, or synchronous.  Yes, it is
308   // deeply ugly to write APIs this way, but that still doesn't mean
309   // that the Readable class should behave improperly, as streams are
310   // designed to be sync/async agnostic.
311   // Take note if the _read call is sync or async (ie, if the read call
312   // has returned yet), so that we know whether or not it's safe to emit
313   // 'readable' etc.
314   //
315   // 3. Actually pull the requested chunks out of the buffer and return.
316
317   // if we need a readable event, then we need to do some reading.
318   var doRead = state.needReadable;
319   debug('need readable', doRead);
320
321   // if we currently have less than the highWaterMark, then also read some
322   if (state.length === 0 || state.length - n < state.highWaterMark) {
323     doRead = true;
324     debug('length less than watermark', doRead);
325   }
326
327   // however, if we've ended, then there's no point, and if we're already
328   // reading, then it's unnecessary.
329   if (state.ended || state.reading) {
330     doRead = false;
331     debug('reading or ended', doRead);
332   }
333
334   if (doRead) {
335     debug('do read');
336     state.reading = true;
337     state.sync = true;
338     // if the length is currently zero, then we *need* a readable event.
339     if (state.length === 0)
340       state.needReadable = true;
341     // call internal read method
342     this._read(state.highWaterMark);
343     state.sync = false;
344   }
345
346   // If _read pushed data synchronously, then `reading` will be false,
347   // and we need to re-evaluate how much data we can return to the user.
348   if (doRead && !state.reading)
349     n = howMuchToRead(nOrig, state);
350
351   var ret;
352   if (n > 0)
353     ret = fromList(n, state);
354   else
355     ret = null;
356
357   if (util.isNull(ret)) {
358     state.needReadable = true;
359     n = 0;
360   }
361
362   state.length -= n;
363
364   // If we have nothing in the buffer, then we want to know
365   // as soon as we *do* get something into the buffer.
366   if (state.length === 0 && !state.ended)
367     state.needReadable = true;
368
369   // If we tried to read() past the EOF, then emit end on the next tick.
370   if (nOrig !== n && state.ended && state.length === 0)
371     endReadable(this);
372
373   if (!util.isNull(ret))
374     this.emit('data', ret);
375
376   return ret;
377 };
378
379 function chunkInvalid(state, chunk) {
380   var er = null;
381   if (!util.isBuffer(chunk) &&
382       !util.isString(chunk) &&
383       !util.isNullOrUndefined(chunk) &&
384       !state.objectMode) {
385     er = new TypeError('Invalid non-string/buffer chunk');
386   }
387   return er;
388 }
389
390
391 function onEofChunk(stream, state) {
392   if (state.decoder && !state.ended) {
393     var chunk = state.decoder.end();
394     if (chunk && chunk.length) {
395       state.buffer.push(chunk);
396       state.length += state.objectMode ? 1 : chunk.length;
397     }
398   }
399   state.ended = true;
400
401   // emit 'readable' now to make sure it gets picked up.
402   emitReadable(stream);
403 }
404
405 // Don't emit readable right away in sync mode, because this can trigger
406 // another read() call => stack overflow.  This way, it might trigger
407 // a nextTick recursion warning, but that's not so bad.
408 function emitReadable(stream) {
409   var state = stream._readableState;
410   state.needReadable = false;
411   if (!state.emittedReadable) {
412     debug('emitReadable', state.flowing);
413     state.emittedReadable = true;
414     if (state.sync)
415       process.nextTick(function() {
416         emitReadable_(stream);
417       });
418     else
419       emitReadable_(stream);
420   }
421 }
422
423 function emitReadable_(stream) {
424   debug('emit readable');
425   stream.emit('readable');
426   flow(stream);
427 }
428
429
430 // at this point, the user has presumably seen the 'readable' event,
431 // and called read() to consume some data.  that may have triggered
432 // in turn another _read(n) call, in which case reading = true if
433 // it's in progress.
434 // However, if we're not ended, or reading, and the length < hwm,
435 // then go ahead and try to read some more preemptively.
436 function maybeReadMore(stream, state) {
437   if (!state.readingMore) {
438     state.readingMore = true;
439     process.nextTick(function() {
440       maybeReadMore_(stream, state);
441     });
442   }
443 }
444
445 function maybeReadMore_(stream, state) {
446   var len = state.length;
447   while (!state.reading && !state.flowing && !state.ended &&
448          state.length < state.highWaterMark) {
449     debug('maybeReadMore read 0');
450     stream.read(0);
451     if (len === state.length)
452       // didn't get any data, stop spinning.
453       break;
454     else
455       len = state.length;
456   }
457   state.readingMore = false;
458 }
459
460 // abstract method.  to be overridden in specific implementation classes.
461 // call cb(er, data) where data is <= n in length.
462 // for virtual (non-string, non-buffer) streams, "length" is somewhat
463 // arbitrary, and perhaps not very meaningful.
464 Readable.prototype._read = function(n) {
465   this.emit('error', new Error('not implemented'));
466 };
467
468 Readable.prototype.pipe = function(dest, pipeOpts) {
469   var src = this;
470   var state = this._readableState;
471
472   switch (state.pipesCount) {
473     case 0:
474       state.pipes = dest;
475       break;
476     case 1:
477       state.pipes = [state.pipes, dest];
478       break;
479     default:
480       state.pipes.push(dest);
481       break;
482   }
483   state.pipesCount += 1;
484   debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
485
486   var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
487               dest !== process.stdout &&
488               dest !== process.stderr;
489
490   var endFn = doEnd ? onend : cleanup;
491   if (state.endEmitted)
492     process.nextTick(endFn);
493   else
494     src.once('end', endFn);
495
496   dest.on('unpipe', onunpipe);
497   function onunpipe(readable) {
498     debug('onunpipe');
499     if (readable === src) {
500       cleanup();
501     }
502   }
503
504   function onend() {
505     debug('onend');
506     dest.end();
507   }
508
509   // when the dest drains, it reduces the awaitDrain counter
510   // on the source.  This would be more elegant with a .once()
511   // handler in flow(), but adding and removing repeatedly is
512   // too slow.
513   var ondrain = pipeOnDrain(src);
514   dest.on('drain', ondrain);
515
516   function cleanup() {
517     debug('cleanup');
518     // cleanup event handlers once the pipe is broken
519     dest.removeListener('close', onclose);
520     dest.removeListener('finish', onfinish);
521     dest.removeListener('drain', ondrain);
522     dest.removeListener('error', onerror);
523     dest.removeListener('unpipe', onunpipe);
524     src.removeListener('end', onend);
525     src.removeListener('end', cleanup);
526     src.removeListener('data', ondata);
527
528     // if the reader is waiting for a drain event from this
529     // specific writer, then it would cause it to never start
530     // flowing again.
531     // So, if this is awaiting a drain, then we just call it now.
532     // If we don't know, then assume that we are waiting for one.
533     if (state.awaitDrain &&
534         (!dest._writableState || dest._writableState.needDrain))
535       ondrain();
536   }
537
538   src.on('data', ondata);
539   function ondata(chunk) {
540     debug('ondata');
541     var ret = dest.write(chunk);
542     if (false === ret) {
543       debug('false write response, pause',
544             src._readableState.awaitDrain);
545       src._readableState.awaitDrain++;
546       src.pause();
547     }
548   }
549
550   // if the dest has an error, then stop piping into it.
551   // however, don't suppress the throwing behavior for this.
552   function onerror(er) {
553     debug('onerror', er);
554     unpipe();
555     dest.removeListener('error', onerror);
556     if (EE.listenerCount(dest, 'error') === 0)
557       dest.emit('error', er);
558   }
559   // This is a brutally ugly hack to make sure that our error handler
560   // is attached before any userland ones.  NEVER DO THIS.
561   if (!dest._events || !dest._events.error)
562     dest.on('error', onerror);
563   else if (Array.isArray(dest._events.error))
564     dest._events.error.unshift(onerror);
565   else
566     dest._events.error = [onerror, dest._events.error];
567
568
569
570   // Both close and finish should trigger unpipe, but only once.
571   function onclose() {
572     dest.removeListener('finish', onfinish);
573     unpipe();
574   }
575   dest.once('close', onclose);
576   function onfinish() {
577     debug('onfinish');
578     dest.removeListener('close', onclose);
579     unpipe();
580   }
581   dest.once('finish', onfinish);
582
583   function unpipe() {
584     debug('unpipe');
585     src.unpipe(dest);
586   }
587
588   // tell the dest that it's being piped to
589   dest.emit('pipe', src);
590
591   // start the flow if it hasn't been started already.
592   if (!state.flowing) {
593     debug('pipe resume');
594     src.resume();
595   }
596
597   return dest;
598 };
599
600 function pipeOnDrain(src) {
601   return function() {
602     var state = src._readableState;
603     debug('pipeOnDrain', state.awaitDrain);
604     if (state.awaitDrain)
605       state.awaitDrain--;
606     if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
607       state.flowing = true;
608       flow(src);
609     }
610   };
611 }
612
613
614 Readable.prototype.unpipe = function(dest) {
615   var state = this._readableState;
616
617   // if we're not piping anywhere, then do nothing.
618   if (state.pipesCount === 0)
619     return this;
620
621   // just one destination.  most common case.
622   if (state.pipesCount === 1) {
623     // passed in one, but it's not the right one.
624     if (dest && dest !== state.pipes)
625       return this;
626
627     if (!dest)
628       dest = state.pipes;
629
630     // got a match.
631     state.pipes = null;
632     state.pipesCount = 0;
633     state.flowing = false;
634     if (dest)
635       dest.emit('unpipe', this);
636     return this;
637   }
638
639   // slow case. multiple pipe destinations.
640
641   if (!dest) {
642     // remove all.
643     var dests = state.pipes;
644     var len = state.pipesCount;
645     state.pipes = null;
646     state.pipesCount = 0;
647     state.flowing = false;
648
649     for (var i = 0; i < len; i++)
650       dests[i].emit('unpipe', this);
651     return this;
652   }
653
654   // try to find the right one.
655   var i = state.pipes.indexOf(dest);
656   if (i === -1)
657     return this;
658
659   state.pipes.splice(i, 1);
660   state.pipesCount -= 1;
661   if (state.pipesCount === 1)
662     state.pipes = state.pipes[0];
663
664   dest.emit('unpipe', this);
665
666   return this;
667 };
668
669 // set up data events if they are asked for
670 // Ensure readable listeners eventually get something
671 Readable.prototype.on = function(ev, fn) {
672   var res = Stream.prototype.on.call(this, ev, fn);
673
674   // If listening to data, and it has not explicitly been paused,
675   // then call resume to start the flow of data on the next tick.
676   if (ev === 'data' && false !== this._readableState.flowing) {
677     this.resume();
678   }
679
680   if (ev === 'readable' && this.readable) {
681     var state = this._readableState;
682     if (!state.readableListening) {
683       state.readableListening = true;
684       state.emittedReadable = false;
685       state.needReadable = true;
686       if (!state.reading) {
687         var self = this;
688         process.nextTick(function() {
689           debug('readable nexttick read 0');
690           self.read(0);
691         });
692       } else if (state.length) {
693         emitReadable(this, state);
694       }
695     }
696   }
697
698   return res;
699 };
700 Readable.prototype.addListener = Readable.prototype.on;
701
702 // pause() and resume() are remnants of the legacy readable stream API
703 // If the user uses them, then switch into old mode.
704 Readable.prototype.resume = function() {
705   var state = this._readableState;
706   if (!state.flowing) {
707     debug('resume');
708     state.flowing = true;
709     resume(this, state);
710   }
711   return this;
712 };
713
714 function resume(stream, state) {
715   if (!state.resumeScheduled) {
716     state.resumeScheduled = true;
717     process.nextTick(function() {
718       resume_(stream, state);
719     });
720   }
721 }
722
723 function resume_(stream, state) {
724   if (!state.reading) {
725     debug('resume read 0');
726     stream.read(0);
727   }
728
729   state.resumeScheduled = false;
730   stream.emit('resume');
731   flow(stream);
732   if (state.flowing && !state.reading)
733     stream.read(0);
734 }
735
736 Readable.prototype.pause = function() {
737   debug('call pause flowing=%j', this._readableState.flowing);
738   if (false !== this._readableState.flowing) {
739     debug('pause');
740     this._readableState.flowing = false;
741     this.emit('pause');
742   }
743   return this;
744 };
745
746 function flow(stream) {
747   var state = stream._readableState;
748   debug('flow', state.flowing);
749   if (state.flowing) {
750     do {
751       var chunk = stream.read();
752     } while (null !== chunk && state.flowing);
753   }
754 }
755
756 // wrap an old-style stream as the async data source.
757 // This is *not* part of the readable stream interface.
758 // It is an ugly unfortunate mess of history.
759 Readable.prototype.wrap = function(stream) {
760   var state = this._readableState;
761   var paused = false;
762
763   var self = this;
764   stream.on('end', function() {
765     debug('wrapped end');
766     if (state.decoder && !state.ended) {
767       var chunk = state.decoder.end();
768       if (chunk && chunk.length)
769         self.push(chunk);
770     }
771
772     self.push(null);
773   });
774
775   stream.on('data', function(chunk) {
776     debug('wrapped data');
777     if (state.decoder)
778       chunk = state.decoder.write(chunk);
779
780     // don't skip over falsy values in objectMode
781     //if (state.objectMode && util.isNullOrUndefined(chunk))
782     if (state.objectMode && (chunk === null || chunk === undefined))
783       return;
784     else if (!state.objectMode && (!chunk || !chunk.length))
785       return;
786
787     var ret = self.push(chunk);
788     if (!ret) {
789       paused = true;
790       stream.pause();
791     }
792   });
793
794   // proxy all the other methods.
795   // important when wrapping filters and duplexes.
796   for (var i in stream) {
797     if (util.isFunction(stream[i]) && util.isUndefined(this[i])) {
798       this[i] = function(method) { return function() {
799         return stream[method].apply(stream, arguments);
800       }}(i);
801     }
802   }
803
804   // proxy certain important events.
805   var events = ['error', 'close', 'destroy', 'pause', 'resume'];
806   events.forEach(function(ev) {
807     stream.on(ev, self.emit.bind(self, ev));
808   });
809
810   // when we try to consume some more bytes, simply unpause the
811   // underlying stream.
812   self._read = function(n) {
813     debug('wrapped _read', n);
814     if (paused) {
815       paused = false;
816       stream.resume();
817     }
818   };
819
820   return self;
821 };
822
823
824
825 // exposed for testing purposes only.
826 Readable._fromList = fromList;
827
828 // Pluck off n bytes from an array of buffers.
829 // Length is the combined lengths of all the buffers in the list.
830 function fromList(n, state) {
831   var list = state.buffer;
832   var length = state.length;
833   var stringMode = !!state.decoder;
834   var objectMode = !!state.objectMode;
835   var ret;
836
837   // nothing in the list, definitely empty.
838   if (list.length === 0)
839     return null;
840
841   if (length === 0)
842     ret = null;
843   else if (objectMode)
844     ret = list.shift();
845   else if (!n || n >= length) {
846     // read it all, truncate the array.
847     if (stringMode)
848       ret = list.join('');
849     else
850       ret = Buffer.concat(list, length);
851     list.length = 0;
852   } else {
853     // read just some of it.
854     if (n < list[0].length) {
855       // just take a part of the first list item.
856       // slice is the same for buffers and strings.
857       var buf = list[0];
858       ret = buf.slice(0, n);
859       list[0] = buf.slice(n);
860     } else if (n === list[0].length) {
861       // first list is a perfect match
862       ret = list.shift();
863     } else {
864       // complex case.
865       // we have enough to cover it, but it spans past the first buffer.
866       if (stringMode)
867         ret = '';
868       else
869         ret = new Buffer(n);
870
871       var c = 0;
872       for (var i = 0, l = list.length; i < l && c < n; i++) {
873         var buf = list[0];
874         var cpy = Math.min(n - c, buf.length);
875
876         if (stringMode)
877           ret += buf.slice(0, cpy);
878         else
879           buf.copy(ret, c, 0, cpy);
880
881         if (cpy < buf.length)
882           list[0] = buf.slice(cpy);
883         else
884           list.shift();
885
886         c += cpy;
887       }
888     }
889   }
890
891   return ret;
892 }
893
894 function endReadable(stream) {
895   var state = stream._readableState;
896
897   // If we get here before consuming all the bytes, then that is a
898   // bug in node.  Should never happen.
899   if (state.length > 0)
900     throw new Error('endReadable called on non-empty stream');
901
902   if (!state.endEmitted) {
903     state.ended = true;
904     process.nextTick(function() {
905       // Check that we didn't get one last unshift.
906       if (!state.endEmitted && state.length === 0) {
907         state.endEmitted = true;
908         stream.readable = false;
909         stream.emit('end');
910       }
911     });
912   }
913 }