5 A stream is an abstract interface implemented by various objects in
6 Node.js. For example a [request to an HTTP server][] is a stream, as is
7 [`stdout`][]. Streams are readable, writable, or both. All streams are
8 instances of [`EventEmitter`][].
10 You can load the Stream base classes by doing `require('stream')`.
11 There are base classes provided for [Readable][] streams, [Writable][]
12 streams, [Duplex][] streams, and [Transform][] streams.
14 This document is split up into 3 sections:
16 1. The first section explains the parts of the API that you need to be
17 aware of to use streams in your programs.
18 2. The second section explains the parts of the API that you need to
19 use if you implement your own custom streams yourself. The API is
20 designed to make this easy for you to do.
21 3. The third section goes into more depth about how streams work,
22 including some of the internal mechanisms and functions that you
23 should probably not modify unless you definitely know what you are
27 ## API for Stream Consumers
31 Streams can be either [Readable][], [Writable][], or both ([Duplex][]).
33 All streams are EventEmitters, but they also have other custom methods
34 and properties depending on whether they are Readable, Writable, or
37 If a stream is both Readable and Writable, then it implements all of
38 the methods and events below. So, a [Duplex][] or [Transform][] stream is
39 fully described by this API, though their implementation may be
42 It is not necessary to implement Stream interfaces in order to consume
43 streams in your programs. If you **are** implementing streaming
44 interfaces in your own program, please also refer to
45 [API for Stream Implementors][] below.
47 Almost all Node.js programs, no matter how simple, use Streams in some
48 way. Here is an example of using Streams in an Node.js program:
51 const http = require('http');
53 var server = http.createServer( (req, res) => {
54 // req is an http.IncomingMessage, which is a Readable Stream
55 // res is an http.ServerResponse, which is a Writable Stream
58 // we want to get the data as utf8 strings
59 // If you don't set an encoding, then you'll get Buffer objects
60 req.setEncoding('utf8');
62 // Readable streams emit 'data' events once a listener is added
63 req.on('data', (chunk) => {
67 // the end event tells you that you have entire body
70 var data = JSON.parse(body);
74 return res.end(`error: ${er.message}`);
77 // write back something interesting to the user:
78 res.write(typeof data);
85 // $ curl localhost:1337 -d '{}'
87 // $ curl localhost:1337 -d '"foo"'
89 // $ curl localhost:1337 -d 'not json'
90 // error: Unexpected token o
93 ### Class: stream.Duplex
95 Duplex streams are streams that implement both the [Readable][] and
96 [Writable][] interfaces. See above for usage.
98 Examples of Duplex streams include:
104 ### Class: stream.Readable
108 The Readable stream interface is the abstraction for a *source* of
109 data that you are reading from. In other words, data comes *out* of a
112 A Readable stream will not start emitting data until you indicate that
113 you are ready to receive it.
115 Readable streams have two "modes": a **flowing mode** and a **paused
116 mode**. When in flowing mode, data is read from the underlying system
117 and provided to your program as fast as possible. In paused mode, you
118 must explicitly call `stream.read()` to get chunks of data out.
119 Streams start out in paused mode.
121 **Note**: If no data event handlers are attached, and there are no
122 [`pipe()`][] destinations, and the stream is switched into flowing
123 mode, then data will be lost.
125 You can switch to flowing mode by doing any of the following:
127 * Adding a [`'data'`][] event handler to listen for data.
128 * Calling the [`resume()`][] method to explicitly open the flow.
129 * Calling the [`pipe()`][] method to send the data to a [Writable][].
131 You can switch back to paused mode by doing either of the following:
133 * If there are no pipe destinations, by calling the [`pause()`][]
135 * If there are pipe destinations, by removing any [`'data'`][] event
136 handlers, and removing all pipe destinations by calling the
137 [`unpipe()`][] method.
139 Note that, for backwards compatibility reasons, removing `'data'`
140 event handlers will **not** automatically pause the stream. Also, if
141 there are piped destinations, then calling `pause()` will not
142 guarantee that the stream will *remain* paused once those
143 destinations drain and ask for more data.
145 Examples of readable streams include:
147 * [http responses, on the client][]
148 * [http requests, on the server][]
149 * [fs read streams][]
153 * [child process stdout and stderr][]
154 * [`process.stdin`][]
158 Emitted when the stream and any of its underlying resources (a file
159 descriptor, for example) have been closed. The event indicates that
160 no more events will be emitted, and no further computation will occur.
162 Not all streams will emit the `'close'` event.
166 * `chunk` {Buffer | String} The chunk of data.
168 Attaching a `'data'` event listener to a stream that has not been
169 explicitly paused will switch the stream into flowing mode. Data will
170 then be passed as soon as it is available.
172 If you just want to get all the data out of the stream as fast as
173 possible, this is the best way to do so.
176 var readable = getReadableStreamSomehow();
177 readable.on('data', (chunk) => {
178 console.log('got %d bytes of data', chunk.length);
184 This event fires when there will be no more data to read.
186 Note that the `'end'` event **will not fire** unless the data is
187 completely consumed. This can be done by switching into flowing mode,
188 or by calling `read()` repeatedly until you get to the end.
191 var readable = getReadableStreamSomehow();
192 readable.on('data', (chunk) => {
193 console.log('got %d bytes of data', chunk.length);
195 readable.on('end', () => {
196 console.log('there will be no more data.');
204 Emitted if there was an error receiving data.
206 #### Event: 'readable'
208 When a chunk of data can be read from the stream, it will emit a
211 In some cases, listening for a `'readable'` event will cause some data
212 to be read into the internal buffer from the underlying system, if it
216 var readable = getReadableStreamSomehow();
217 readable.on('readable', () => {
218 // there is some data to read now
222 Once the internal buffer is drained, a `'readable'` event will fire
223 again when more data is available.
225 The `'readable'` event is not emitted in the "flowing" mode with the
226 sole exception of the last one, on end-of-stream.
228 The `'readable'` event indicates that the stream has new information:
229 either new data is available or the end of the stream has been reached.
230 In the former case, `.read()` will return that data. In the latter case,
231 `.read()` will return null. For instance, in the following example, `foo.txt`
235 const fs = require('fs');
236 var rr = fs.createReadStream('foo.txt');
237 rr.on('readable', () => {
238 console.log('readable:', rr.read());
245 The output of running this script is:
248 bash-3.2$ node test.js
253 #### readable.isPaused()
257 This method returns whether or not the `readable` has been **explicitly**
258 paused by client code (using `readable.pause()` without a corresponding
259 `readable.resume()`).
262 var readable = new stream.Readable
264 readable.isPaused() // === false
266 readable.isPaused() // === true
268 readable.isPaused() // === false
271 #### readable.pause()
275 This method will cause a stream in flowing mode to stop emitting
276 `'data'` events, switching out of flowing mode. Any data that becomes
277 available will remain in the internal buffer.
280 var readable = getReadableStreamSomehow();
281 readable.on('data', (chunk) => {
282 console.log('got %d bytes of data', chunk.length);
284 console.log('there will be no more data for 1 second');
286 console.log('now data will start flowing again');
292 #### readable.pipe(destination[, options])
294 * `destination` {[Writable][] Stream} The destination for writing data
295 * `options` {Object} Pipe options
296 * `end` {Boolean} End the writer when the reader ends. Default = `true`
298 This method pulls all the data out of a readable stream, and writes it
299 to the supplied destination, automatically managing the flow so that
300 the destination is not overwhelmed by a fast readable stream.
302 Multiple destinations can be piped to safely.
305 var readable = getReadableStreamSomehow();
306 var writable = fs.createWriteStream('file.txt');
307 // All the data from readable goes into 'file.txt'
308 readable.pipe(writable);
311 This function returns the destination stream, so you can set up pipe
315 var r = fs.createReadStream('file.txt');
316 var z = zlib.createGzip();
317 var w = fs.createWriteStream('file.txt.gz');
321 For example, emulating the Unix `cat` command:
324 process.stdin.pipe(process.stdout);
327 By default [`end()`][] is called on the destination when the source stream
328 emits `end`, so that `destination` is no longer writable. Pass `{ end:
329 false }` as `options` to keep the destination stream open.
331 This keeps `writer` open so that "Goodbye" can be written at the
335 reader.pipe(writer, { end: false });
336 reader.on('end', () => {
337 writer.end('Goodbye\n');
341 Note that `process.stderr` and `process.stdout` are never closed until
342 the process exits, regardless of the specified options.
344 #### readable.read([size])
346 * `size` {Number} Optional argument to specify how much data to read.
347 * Return {String | Buffer | null}
349 The `read()` method pulls some data out of the internal buffer and
350 returns it. If there is no data available, then it will return
353 If you pass in a `size` argument, then it will return that many
354 bytes. If `size` bytes are not available, then it will return `null`,
355 unless we've ended, in which case it will return the data remaining
358 If you do not specify a `size` argument, then it will return all the
359 data in the internal buffer.
361 This method should only be called in paused mode. In flowing mode,
362 this method is called automatically until the internal buffer is
366 var readable = getReadableStreamSomehow();
367 readable.on('readable', () => {
369 while (null !== (chunk = readable.read())) {
370 console.log('got %d bytes of data', chunk.length);
375 If this method returns a data chunk, then it will also trigger the
376 emission of a [`'data'`][] event.
378 Note that calling `readable.read([size])` after the `'end'` event has been
379 triggered will return `null`. No runtime error will be raised.
381 #### readable.resume()
385 This method will cause the readable stream to resume emitting `data`
388 This method will switch the stream into flowing mode. If you do *not*
389 want to consume the data from a stream, but you *do* want to get to
390 its `'end'` event, you can call [`readable.resume()`][] to open the flow of
394 var readable = getReadableStreamSomehow();
396 readable.on('end', () => {
397 console.log('got to the end, but did not read anything');
401 #### readable.setEncoding(encoding)
403 * `encoding` {String} The encoding to use.
406 Call this function to cause the stream to return strings of the
407 specified encoding instead of Buffer objects. For example, if you do
408 `readable.setEncoding('utf8')`, then the output data will be
409 interpreted as UTF-8 data, and returned as strings. If you do
410 `readable.setEncoding('hex')`, then the data will be encoded in
411 hexadecimal string format.
413 This properly handles multi-byte characters that would otherwise be
414 potentially mangled if you simply pulled the Buffers directly and
415 called `buf.toString(encoding)` on them. If you want to read the data
416 as strings, always use this method.
419 var readable = getReadableStreamSomehow();
420 readable.setEncoding('utf8');
421 readable.on('data', (chunk) => {
422 assert.equal(typeof chunk, 'string');
423 console.log('got %d characters of string data', chunk.length);
427 #### readable.unpipe([destination])
429 * `destination` {[Writable][] Stream} Optional specific stream to unpipe
431 This method will remove the hooks set up for a previous `pipe()` call.
433 If the destination is not specified, then all pipes are removed.
435 If the destination is specified, but no pipe is set up for it, then
439 var readable = getReadableStreamSomehow();
440 var writable = fs.createWriteStream('file.txt');
441 // All the data from readable goes into 'file.txt',
442 // but only for the first second
443 readable.pipe(writable);
445 console.log('stop writing to file.txt');
446 readable.unpipe(writable);
447 console.log('manually close the file stream');
452 #### readable.unshift(chunk)
454 * `chunk` {Buffer | String} Chunk of data to unshift onto the read queue
456 This is useful in certain cases where a stream is being consumed by a
457 parser, which needs to "un-consume" some data that it has
458 optimistically pulled out of the source, so that the stream can be
459 passed on to some other party.
461 Note that `stream.unshift(chunk)` cannot be called after the `'end'` event
462 has been triggered; a runtime error will be raised.
464 If you find that you must often call `stream.unshift(chunk)` in your
465 programs, consider implementing a [Transform][] stream instead. (See API
466 for Stream Implementors, below.)
469 // Pull off a header delimited by \n\n
470 // use unshift() if we get too much
471 // Call the callback with (error, header, stream)
472 const StringDecoder = require('string_decoder').StringDecoder;
473 function parseHeader(stream, callback) {
474 stream.on('error', callback);
475 stream.on('readable', onReadable);
476 var decoder = new StringDecoder('utf8');
478 function onReadable() {
480 while (null !== (chunk = stream.read())) {
481 var str = decoder.write(chunk);
482 if (str.match(/\n\n/)) {
483 // found the header boundary
484 var split = str.split(/\n\n/);
485 header += split.shift();
486 var remaining = split.join('\n\n');
487 var buf = new Buffer(remaining, 'utf8');
490 stream.removeListener('error', callback);
491 stream.removeListener('readable', onReadable);
492 // now the body of the message can be read from the stream.
493 callback(null, header, stream);
495 // still reading the header.
502 Note that, unlike `stream.push(chunk)`, `stream.unshift(chunk)` will not
503 end the reading process by resetting the internal reading state of the
504 stream. This can cause unexpected results if `unshift` is called during a
505 read (i.e. from within a `_read` implementation on a custom stream). Following
506 the call to `unshift` with an immediate `stream.push('')` will reset the
507 reading state appropriately, however it is best to simply avoid calling
508 `unshift` while in the process of performing a read.
510 #### readable.wrap(stream)
512 * `stream` {Stream} An "old style" readable stream
514 Versions of Node.js prior to v0.10 had streams that did not implement the
515 entire Streams API as it is today. (See "Compatibility" below for
518 If you are using an older Node.js library that emits `'data'` events and
519 has a [`pause()`][] method that is advisory only, then you can use the
520 `wrap()` method to create a [Readable][] stream that uses the old stream
523 You will very rarely ever need to call this function, but it exists
524 as a convenience for interacting with old Node.js programs and libraries.
529 const OldReader = require('./old-api-module.js').OldReader;
530 const Readable = require('stream').Readable;
531 const oreader = new OldReader;
532 const myReader = new Readable().wrap(oreader);
534 myReader.on('readable', () => {
535 myReader.read(); // etc.
539 ### Class: stream.Transform
541 Transform streams are [Duplex][] streams where the output is in some way
542 computed from the input. They implement both the [Readable][] and
543 [Writable][] interfaces. See above for usage.
545 Examples of Transform streams include:
550 ### Class: stream.Writable
554 The Writable stream interface is an abstraction for a *destination*
555 that you are writing data *to*.
557 Examples of writable streams include:
559 * [http requests, on the client][]
560 * [http responses, on the server][]
561 * [fs write streams][]
565 * [child process stdin][]
566 * [`process.stdout`][], [`process.stderr`][]
570 If a [`writable.write(chunk)`][] call returns false, then the `'drain'`
571 event will indicate when it is appropriate to begin writing more data
575 // Write the data to the supplied writable stream one million times.
576 // Be attentive to back-pressure.
577 function writeOneMillionTimes(writer, data, encoding, callback) {
586 writer.write(data, encoding, callback);
588 // see if we should continue, or wait
589 // don't pass the callback, because we're not done yet.
590 ok = writer.write(data, encoding);
592 } while (i > 0 && ok);
594 // had to stop early!
595 // write some more once it drains
596 writer.once('drain', write);
606 Emitted if there was an error when writing or piping data.
610 When the [`end()`][] method has been called, and all data has been flushed
611 to the underlying system, this event is emitted.
614 var writer = getWritableStreamSomehow();
615 for (var i = 0; i < 100; i ++) {
616 writer.write('hello, #${i}!\n');
618 writer.end('this is the end\n');
619 writer.on('finish', () => {
620 console.error('all writes are now complete.');
626 * `src` {[Readable][] Stream} source stream that is piping to this writable
628 This is emitted whenever the `pipe()` method is called on a readable
629 stream, adding this writable to its set of destinations.
632 var writer = getWritableStreamSomehow();
633 var reader = getReadableStreamSomehow();
634 writer.on('pipe', (src) => {
635 console.error('something is piping into the writer');
636 assert.equal(src, reader);
643 * `src` {[Readable][] Stream} The source stream that [unpiped][] this writable
645 This is emitted whenever the [`unpipe()`][] method is called on a
646 readable stream, removing this writable from its set of destinations.
649 var writer = getWritableStreamSomehow();
650 var reader = getReadableStreamSomehow();
651 writer.on('unpipe', (src) => {
652 console.error('something has stopped piping into the writer');
653 assert.equal(src, reader);
656 reader.unpipe(writer);
661 Forces buffering of all writes.
663 Buffered data will be flushed either at `.uncork()` or at `.end()` call.
665 #### writable.end([chunk][, encoding][, callback])
667 * `chunk` {String | Buffer} Optional data to write
668 * `encoding` {String} The encoding, if `chunk` is a String
669 * `callback` {Function} Optional callback for when the stream is finished
671 Call this method when no more data will be written to the stream. If
672 supplied, the callback is attached as a listener on the `'finish'` event.
674 Calling [`write()`][] after calling [`end()`][] will raise an error.
677 // write 'hello, ' and then end with 'world!'
678 var file = fs.createWriteStream('example.txt');
679 file.write('hello, ');
681 // writing more now is not allowed!
684 #### writable.setDefaultEncoding(encoding)
686 * `encoding` {String} The new default encoding
688 Sets the default encoding for a writable stream.
690 #### writable.uncork()
692 Flush all data, buffered since `.cork()` call.
694 #### writable.write(chunk[, encoding][, callback])
696 * `chunk` {String | Buffer} The data to write
697 * `encoding` {String} The encoding, if `chunk` is a String
698 * `callback` {Function} Callback for when this chunk of data is flushed
699 * Returns: {Boolean} True if the data was handled completely.
701 This method writes some data to the underlying system, and calls the
702 supplied callback once the data has been fully handled.
704 The return value indicates if you should continue writing right now.
705 If the data had to be buffered internally, then it will return
706 `false`. Otherwise, it will return `true`.
708 This return value is strictly advisory. You MAY continue to write,
709 even if it returns `false`. However, writes will be buffered in
710 memory, so it is best not to do this excessively. Instead, wait for
711 the `'drain'` event before writing more data.
714 ## API for Stream Implementors
718 To implement any sort of stream, the pattern is the same:
720 1. Extend the appropriate parent class in your own subclass. (The
721 [`util.inherits`][] method is particularly helpful for this.)
722 2. Call the appropriate parent class constructor in your constructor,
723 to be sure that the internal mechanisms are set up properly.
724 3. Implement one or more specific methods, as detailed below.
726 The class to extend and the method(s) to implement depend on the sort
727 of stream class you are writing:
739 <p>Method(s) to implement</p>
748 <p>[Readable](#stream_class_stream_readable_1)</p>
751 <p><code>[_read][]</code></p>
759 <p>[Writable](#stream_class_stream_writable_1)</p>
762 <p><code>[_write][]</code>, <code>[_writev][]</code></p>
767 <p>Reading and writing</p>
770 <p>[Duplex](#stream_class_stream_duplex_1)</p>
773 <p><code>[_read][]</code>, <code>[_write][]</code>, <code>[_writev][]</code></p>
778 <p>Operate on written data, then read the result</p>
781 <p>[Transform](#stream_class_stream_transform_1)</p>
784 <p><code>[_transform][]</code>, <code>[_flush][]</code></p>
789 In your implementation code, it is very important to never call the
790 methods described in [API for Stream Consumers][] above. Otherwise, you
791 can potentially cause adverse side effects in programs that consume
792 your streaming interfaces.
794 ### Class: stream.Duplex
798 A "duplex" stream is one that is both Readable and Writable, such as a
799 TCP socket connection.
801 Note that `stream.Duplex` is an abstract class designed to be extended
802 with an underlying implementation of the `_read(size)` and
803 [`_write(chunk, encoding, callback)`][] methods as you would with a
804 Readable or Writable stream class.
806 Since JavaScript doesn't have multiple prototypal inheritance, this
807 class prototypally inherits from Readable, and then parasitically from
808 Writable. It is thus up to the user to implement both the lowlevel
809 `_read(n)` method as well as the lowlevel
810 [`_write(chunk, encoding, callback)`][] method on extension duplex classes.
812 #### new stream.Duplex(options)
814 * `options` {Object} Passed to both Writable and Readable
815 constructors. Also has the following fields:
816 * `allowHalfOpen` {Boolean} Default=true. If set to `false`, then
817 the stream will automatically end the readable side when the
818 writable side ends and vice versa.
819 * `readableObjectMode` {Boolean} Default=false. Sets `objectMode`
820 for readable side of the stream. Has no effect if `objectMode`
822 * `writableObjectMode` {Boolean} Default=false. Sets `objectMode`
823 for writable side of the stream. Has no effect if `objectMode`
826 In classes that extend the Duplex class, make sure to call the
827 constructor so that the buffering settings can be properly
830 ### Class: stream.PassThrough
832 This is a trivial implementation of a [Transform][] stream that simply
833 passes the input bytes across to the output. Its purpose is mainly
834 for examples and testing, but there are occasionally use cases where
835 it can come in handy as a building block for novel sorts of streams.
837 ### Class: stream.Readable
841 `stream.Readable` is an abstract class designed to be extended with an
842 underlying implementation of the [`_read(size)`][] method.
844 Please see above under [API for Stream Consumers][] for how to consume
845 streams in your programs. What follows is an explanation of how to
846 implement Readable streams in your programs.
848 #### new stream.Readable([options])
851 * `highWaterMark` {Number} The maximum number of bytes to store in
852 the internal buffer before ceasing to read from the underlying
853 resource. Default=16kb, or 16 for `objectMode` streams
854 * `encoding` {String} If specified, then buffers will be decoded to
855 strings using the specified encoding. Default=null
856 * `objectMode` {Boolean} Whether this stream should behave
857 as a stream of objects. Meaning that stream.read(n) returns
858 a single value instead of a Buffer of size n. Default=false
859 * `read` {Function} Implementation for the [`_read()`][] method.
861 In classes that extend the Readable class, make sure to call the
862 Readable constructor so that the buffering settings can be properly
865 #### readable.\_read(size)
867 * `size` {Number} Number of bytes to read asynchronously
869 Note: **Implement this method, but do NOT call it directly.**
871 This method is prefixed with an underscore because it is internal to the
872 class that defines it and should only be called by the internal Readable
873 class methods. All Readable stream implementations must provide a \_read
874 method to fetch data from the underlying resource.
876 When \_read is called, if data is available from the resource, `_read` should
877 start pushing that data into the read queue by calling `this.push(dataChunk)`.
878 `_read` should continue reading from the resource and pushing data until push
879 returns false, at which point it should stop reading from the resource. Only
880 when \_read is called again after it has stopped should it start reading
881 more data from the resource and pushing that data onto the queue.
883 Note: once the `_read()` method is called, it will not be called again until
884 the `push` method is called.
886 The `size` argument is advisory. Implementations where a "read" is a
887 single call that returns data can use this to know how much data to
888 fetch. Implementations where that is not relevant, such as TCP or
889 TLS, may ignore this argument, and simply provide data whenever it
890 becomes available. There is no need, for example to "wait" until
891 `size` bytes are available before calling [`stream.push(chunk)`][].
893 #### readable.push(chunk[, encoding])
895 * `chunk` {Buffer | null | String} Chunk of data to push into the read queue
896 * `encoding` {String} Encoding of String chunks. Must be a valid
897 Buffer encoding, such as `'utf8'` or `'ascii'`
898 * return {Boolean} Whether or not more pushes should be performed
900 Note: **This method should be called by Readable implementors, NOT
901 by consumers of Readable streams.**
903 If a value other than null is passed, The `push()` method adds a chunk of data
904 into the queue for subsequent stream processors to consume. If `null` is
905 passed, it signals the end of the stream (EOF), after which no more data
908 The data added with `push` can be pulled out by calling the `read()` method
909 when the `'readable'` event fires.
911 This API is designed to be as flexible as possible. For example,
912 you may be wrapping a lower-level source which has some sort of
913 pause/resume mechanism, and a data callback. In those cases, you
914 could wrap the low-level source object by doing something like this:
917 // source is an object with readStop() and readStart() methods,
918 // and an `ondata` member that gets called when it has data, and
919 // an `onend` member that gets called when the data is over.
921 util.inherits(SourceWrapper, Readable);
923 function SourceWrapper(options) {
924 Readable.call(this, options);
926 this._source = getLowlevelSourceObject();
929 // Every time there's data, we push it into the internal buffer.
930 this._source.ondata = function(chunk) {
931 // if push() returns false, then we need to stop reading from source
932 if (!self.push(chunk))
933 self._source.readStop();
936 // When the source ends, we push the EOF-signaling `null` chunk
937 this._source.onend = function() {
942 // _read will be called when the stream wants to pull more data in
943 // the advisory size argument is ignored in this case.
944 SourceWrapper.prototype._read = function(size) {
945 this._source.readStart();
949 #### Example: A Counting Stream
953 This is a basic example of a Readable stream. It emits the numerals
954 from 1 to 1,000,000 in ascending order, and then ends.
957 const Readable = require('stream').Readable;
958 const util = require('util');
959 util.inherits(Counter, Readable);
961 function Counter(opt) {
962 Readable.call(this, opt);
967 Counter.prototype._read = function() {
968 var i = this._index++;
973 var buf = new Buffer(str, 'ascii');
979 #### Example: SimpleProtocol v1 (Sub-optimal)
981 This is similar to the `parseHeader` function described above, but
982 implemented as a custom stream. Also, note that this implementation
983 does not convert the incoming data to a string.
985 However, this would be better implemented as a [Transform][] stream. See
986 below for a better implementation.
989 // A parser for a simple data protocol.
990 // The "header" is a JSON object, followed by 2 \n characters, and
991 // then a message body.
993 // NOTE: This can be done more simply as a Transform stream!
994 // Using Readable directly for this is sub-optimal. See the
995 // alternative example below under the Transform section.
997 const Readable = require('stream').Readable;
998 const util = require('util');
1000 util.inherits(SimpleProtocol, Readable);
1002 function SimpleProtocol(source, options) {
1003 if (!(this instanceof SimpleProtocol))
1004 return new SimpleProtocol(source, options);
1006 Readable.call(this, options);
1007 this._inBody = false;
1008 this._sawFirstCr = false;
1010 // source is a readable stream, such as a socket or file
1011 this._source = source;
1014 source.on('end', () => {
1018 // give it a kick whenever the source is readable
1019 // read(0) will not consume any bytes
1020 source.on('readable', () => {
1024 this._rawHeader = [];
1028 SimpleProtocol.prototype._read = function(n) {
1029 if (!this._inBody) {
1030 var chunk = this._source.read();
1032 // if the source doesn't have data, we don't have data yet.
1034 return this.push('');
1036 // check if the chunk has a \n\n
1038 for (var i = 0; i < chunk.length; i++) {
1039 if (chunk[i] === 10) { // '\n'
1040 if (this._sawFirstCr) {
1044 this._sawFirstCr = true;
1047 this._sawFirstCr = false;
1052 // still waiting for the \n\n
1053 // stash the chunk, and try again.
1054 this._rawHeader.push(chunk);
1057 this._inBody = true;
1058 var h = chunk.slice(0, split);
1059 this._rawHeader.push(h);
1060 var header = Buffer.concat(this._rawHeader).toString();
1062 this.header = JSON.parse(header);
1064 this.emit('error', new Error('invalid simple protocol data'));
1067 // now, because we got some extra data, unshift the rest
1068 // back into the read queue so that our consumer will see it.
1069 var b = chunk.slice(split);
1071 // calling unshift by itself does not reset the reading state
1072 // of the stream; since we're inside _read, doing an additional
1073 // push('') will reset the state appropriately.
1076 // and let them know that we are done parsing the header.
1077 this.emit('header', this.header);
1080 // from there on, just provide the data to our consumer.
1081 // careful not to push(null), since that would indicate EOF.
1082 var chunk = this._source.read();
1083 if (chunk) this.push(chunk);
1088 // var parser = new SimpleProtocol(source);
1089 // Now parser is a readable stream that will emit 'header'
1090 // with the parsed header data.
1093 ### Class: stream.Transform
1095 A "transform" stream is a duplex stream where the output is causally
1096 connected in some way to the input, such as a [zlib][] stream or a
1099 There is no requirement that the output be the same size as the input,
1100 the same number of chunks, or arrive at the same time. For example, a
1101 Hash stream will only ever have a single chunk of output which is
1102 provided when the input is ended. A zlib stream will produce output
1103 that is either much smaller or much larger than its input.
1105 Rather than implement the [`_read()`][] and [`_write()`][] methods, Transform
1106 classes must implement the `_transform()` method, and may optionally
1107 also implement the `_flush()` method. (See below.)
1109 #### new stream.Transform([options])
1111 * `options` {Object} Passed to both Writable and Readable
1112 constructors. Also has the following fields:
1113 * `transform` {Function} Implementation for the [`_transform()`][] method.
1114 * `flush` {Function} Implementation for the [`_flush()`][] method.
1116 In classes that extend the Transform class, make sure to call the
1117 constructor so that the buffering settings can be properly
1120 #### Events: 'finish' and 'end'
1122 The [`'finish'`][] and [`'end'`][] events are from the parent Writable
1123 and Readable classes respectively. The `'finish'` event is fired after
1124 `.end()` is called and all chunks have been processed by `_transform`,
1125 `end` is fired after all data has been output which is after the callback
1126 in `_flush` has been called.
1128 #### transform.\_flush(callback)
1130 * `callback` {Function} Call this function (optionally with an error
1131 argument) when you are done flushing any remaining data.
1133 Note: **This function MUST NOT be called directly.** It MAY be implemented
1134 by child classes, and if so, will be called by the internal Transform
1137 In some cases, your transform operation may need to emit a bit more
1138 data at the end of the stream. For example, a `Zlib` compression
1139 stream will store up some internal state so that it can optimally
1140 compress the output. At the end, however, it needs to do the best it
1141 can with what is left, so that the data will be complete.
1143 In those cases, you can implement a `_flush` method, which will be
1144 called at the very end, after all the written data is consumed, but
1145 before emitting `end` to signal the end of the readable side. Just
1146 like with `_transform`, call `transform.push(chunk)` zero or more
1147 times, as appropriate, and call `callback` when the flush operation is
1150 This method is prefixed with an underscore because it is internal to
1151 the class that defines it, and should not be called directly by user
1152 programs. However, you **are** expected to override this method in
1153 your own extension classes.
1155 #### transform.\_transform(chunk, encoding, callback)
1157 * `chunk` {Buffer | String} The chunk to be transformed. Will **always**
1158 be a buffer unless the `decodeStrings` option was set to `false`.
1159 * `encoding` {String} If the chunk is a string, then this is the
1160 encoding type. If chunk is a buffer, then this is the special
1161 value - 'buffer', ignore it in this case.
1162 * `callback` {Function} Call this function (optionally with an error
1163 argument and data) when you are done processing the supplied chunk.
1165 Note: **This function MUST NOT be called directly.** It should be
1166 implemented by child classes, and called by the internal Transform
1169 All Transform stream implementations must provide a `_transform`
1170 method to accept input and produce output.
1172 `_transform` should do whatever has to be done in this specific
1173 Transform class, to handle the bytes being written, and pass them off
1174 to the readable portion of the interface. Do asynchronous I/O,
1175 process things, and so on.
1177 Call `transform.push(outputChunk)` 0 or more times to generate output
1178 from this input chunk, depending on how much data you want to output
1179 as a result of this chunk.
1181 Call the callback function only when the current chunk is completely
1182 consumed. Note that there may or may not be output as a result of any
1183 particular input chunk. If you supply a second argument to the callback
1184 it will be passed to the push method. In other words the following are
1188 transform.prototype._transform = function (data, encoding, callback) {
1193 transform.prototype._transform = function (data, encoding, callback) {
1194 callback(null, data);
1198 This method is prefixed with an underscore because it is internal to
1199 the class that defines it, and should not be called directly by user
1200 programs. However, you **are** expected to override this method in
1201 your own extension classes.
1203 #### Example: `SimpleProtocol` parser v2
1205 The example above of a simple protocol parser can be implemented
1206 simply by using the higher level [Transform][] stream class, similar to
1207 the `parseHeader` and `SimpleProtocol v1` examples above.
1209 In this example, rather than providing the input as an argument, it
1210 would be piped into the parser, which is a more idiomatic Node.js stream
1214 const util = require('util');
1215 const Transform = require('stream').Transform;
1216 util.inherits(SimpleProtocol, Transform);
1218 function SimpleProtocol(options) {
1219 if (!(this instanceof SimpleProtocol))
1220 return new SimpleProtocol(options);
1222 Transform.call(this, options);
1223 this._inBody = false;
1224 this._sawFirstCr = false;
1225 this._rawHeader = [];
1229 SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
1230 if (!this._inBody) {
1231 // check if the chunk has a \n\n
1233 for (var i = 0; i < chunk.length; i++) {
1234 if (chunk[i] === 10) { // '\n'
1235 if (this._sawFirstCr) {
1239 this._sawFirstCr = true;
1242 this._sawFirstCr = false;
1247 // still waiting for the \n\n
1248 // stash the chunk, and try again.
1249 this._rawHeader.push(chunk);
1251 this._inBody = true;
1252 var h = chunk.slice(0, split);
1253 this._rawHeader.push(h);
1254 var header = Buffer.concat(this._rawHeader).toString();
1256 this.header = JSON.parse(header);
1258 this.emit('error', new Error('invalid simple protocol data'));
1261 // and let them know that we are done parsing the header.
1262 this.emit('header', this.header);
1264 // now, because we got some extra data, emit this first.
1265 this.push(chunk.slice(split));
1268 // from there on, just provide the data to our consumer as-is.
1275 // var parser = new SimpleProtocol();
1276 // source.pipe(parser)
1277 // Now parser is a readable stream that will emit 'header'
1278 // with the parsed header data.
1281 ### Class: stream.Writable
1285 `stream.Writable` is an abstract class designed to be extended with an
1286 underlying implementation of the [`_write(chunk, encoding, callback)`][] method.
1288 Please see above under [API for Stream Consumers][] for how to consume
1289 writable streams in your programs. What follows is an explanation of
1290 how to implement Writable streams in your programs.
1292 #### new stream.Writable([options])
1294 * `options` {Object}
1295 * `highWaterMark` {Number} Buffer level when [`write()`][] starts
1296 returning false. Default=16kb, or 16 for `objectMode` streams
1297 * `decodeStrings` {Boolean} Whether or not to decode strings into
1298 Buffers before passing them to [`_write()`][]. Default=true
1299 * `objectMode` {Boolean} Whether or not the `write(anyObj)` is
1300 a valid operation. If set you can write arbitrary data instead
1301 of only `Buffer` / `String` data. Default=false
1302 * `write` {Function} Implementation for the [`_write()`][] method.
1303 * `writev` {Function} Implementation for the [`_writev()`][] method.
1305 In classes that extend the Writable class, make sure to call the
1306 constructor so that the buffering settings can be properly
1309 #### writable.\_write(chunk, encoding, callback)
1311 * `chunk` {Buffer | String} The chunk to be written. Will **always**
1312 be a buffer unless the `decodeStrings` option was set to `false`.
1313 * `encoding` {String} If the chunk is a string, then this is the
1314 encoding type. If chunk is a buffer, then this is the special
1315 value - 'buffer', ignore it in this case.
1316 * `callback` {Function} Call this function (optionally with an error
1317 argument) when you are done processing the supplied chunk.
1319 All Writable stream implementations must provide a [`_write()`][]
1320 method to send data to the underlying resource.
1322 Note: **This function MUST NOT be called directly.** It should be
1323 implemented by child classes, and called by the internal Writable
1326 Call the callback using the standard `callback(error)` pattern to
1327 signal that the write completed successfully or with an error.
1329 If the `decodeStrings` flag is set in the constructor options, then
1330 `chunk` may be a string rather than a Buffer, and `encoding` will
1331 indicate the sort of string that it is. This is to support
1332 implementations that have an optimized handling for certain string
1333 data encodings. If you do not explicitly set the `decodeStrings`
1334 option to `false`, then you can safely ignore the `encoding` argument,
1335 and assume that `chunk` will always be a Buffer.
1337 This method is prefixed with an underscore because it is internal to
1338 the class that defines it, and should not be called directly by user
1339 programs. However, you **are** expected to override this method in
1340 your own extension classes.
1342 #### writable.\_writev(chunks, callback)
1344 * `chunks` {Array} The chunks to be written. Each chunk has following
1345 format: `{ chunk: ..., encoding: ... }`.
1346 * `callback` {Function} Call this function (optionally with an error
1347 argument) when you are done processing the supplied chunks.
1349 Note: **This function MUST NOT be called directly.** It may be
1350 implemented by child classes, and called by the internal Writable
1353 This function is completely optional to implement. In most cases it is
1354 unnecessary. If implemented, it will be called with all the chunks
1355 that are buffered in the write queue.
1358 ## Simplified Constructor API
1362 In simple cases there is now the added benefit of being able to construct a stream without inheritance.
1364 This can be done by passing the appropriate methods as constructor options:
1370 var duplex = new stream.Duplex({
1372 // sets this._read under the hood
1374 // push data onto the read queue, passing null
1375 // will signal the end of the stream (EOF)
1378 write: function(chunk, encoding, next) {
1379 // sets this._write under the hood
1381 // An optional error can be passed as the first argument
1388 var duplex = new stream.Duplex({
1390 // sets this._read under the hood
1392 // push data onto the read queue, passing null
1393 // will signal the end of the stream (EOF)
1396 writev: function(chunks, next) {
1397 // sets this._writev under the hood
1399 // An optional error can be passed as the first argument
1407 var readable = new stream.Readable({
1409 // sets this._read under the hood
1411 // push data onto the read queue, passing null
1412 // will signal the end of the stream (EOF)
1420 var transform = new stream.Transform({
1421 transform: function(chunk, encoding, next) {
1422 // sets this._transform under the hood
1424 // generate output as many times as needed
1425 // this.push(chunk);
1427 // call when the current chunk is consumed
1430 flush: function(done) {
1431 // sets this._flush under the hood
1433 // generate output as many times as needed
1434 // this.push(chunk);
1443 var writable = new stream.Writable({
1444 write: function(chunk, encoding, next) {
1445 // sets this._write under the hood
1447 // An optional error can be passed as the first argument
1454 var writable = new stream.Writable({
1455 writev: function(chunks, next) {
1456 // sets this._writev under the hood
1458 // An optional error can be passed as the first argument
1464 ## Streams: Under the Hood
1472 Both Writable and Readable streams will buffer data on an internal
1473 object which can be retrieved from `_writableState.getBuffer()` or
1474 `_readableState.buffer`, respectively.
1476 The amount of data that will potentially be buffered depends on the
1477 `highWaterMark` option which is passed into the constructor.
1479 Buffering in Readable streams happens when the implementation calls
1480 [`stream.push(chunk)`][]. If the consumer of the Stream does not call
1481 `stream.read()`, then the data will sit in the internal queue until it
1484 Buffering in Writable streams happens when the user calls
1485 [`stream.write(chunk)`][] repeatedly, even when `write()` returns `false`.
1487 The purpose of streams, especially with the `pipe()` method, is to
1488 limit the buffering of data to acceptable levels, so that sources and
1489 destinations of varying speed will not overwhelm the available memory.
1491 ### Compatibility with Older Node.js Versions
1495 In versions of Node.js prior to v0.10, the Readable stream interface was
1496 simpler, but also less powerful and less useful.
1498 * Rather than waiting for you to call the `read()` method, `'data'`
1499 events would start emitting immediately. If you needed to do some
1500 I/O to decide how to handle data, then you had to store the chunks
1501 in some kind of buffer so that they would not be lost.
1502 * The [`pause()`][] method was advisory, rather than guaranteed. This
1503 meant that you still had to be prepared to receive `'data'` events
1504 even when the stream was in a paused state.
1506 In Node.js v0.10, the Readable class described below was added.
1507 For backwards compatibility with older Node.js programs, Readable streams
1508 switch into "flowing mode" when a `'data'` event handler is added, or
1509 when the [`resume()`][] method is called. The effect is that, even if
1510 you are not using the new `read()` method and `'readable'` event, you
1511 no longer have to worry about losing `'data'` chunks.
1513 Most programs will continue to function normally. However, this
1514 introduces an edge case in the following conditions:
1516 * No [`'data'`][] event handler is added.
1517 * The [`resume()`][] method is never called.
1518 * The stream is not piped to any writable destination.
1520 For example, consider the following code:
1524 net.createServer((socket) => {
1526 // we add an 'end' method, but never consume the data
1527 socket.on('end', () => {
1528 // It will never get here.
1529 socket.end('I got your message (but didnt read it)\n');
1535 In versions of Node.js prior to v0.10, the incoming message data would be
1536 simply discarded. However, in Node.js v0.10 and beyond,
1537 the socket will remain paused forever.
1539 The workaround in this situation is to call the `resume()` method to
1540 start the flow of data:
1544 net.createServer((socket) => {
1546 socket.on('end', () => {
1547 socket.end('I got your message (but didnt read it)\n');
1550 // start the flow of data, discarding it.
1556 In addition to new Readable streams switching into flowing mode,
1557 pre-v0.10 style streams can be wrapped in a Readable class using the
1565 Normally, Streams operate on Strings and Buffers exclusively.
1567 Streams that are in **object mode** can emit generic JavaScript values
1568 other than Buffers and Strings.
1570 A Readable stream in object mode will always return a single item from
1571 a call to `stream.read(size)`, regardless of what the size argument
1574 A Writable stream in object mode will always ignore the `encoding`
1575 argument to `stream.write(data, encoding)`.
1577 The special value `null` still retains its special value for object
1578 mode streams. That is, for object mode readable streams, `null` as a
1579 return value from `stream.read()` indicates that there is no more
1580 data, and [`stream.push(null)`][] will signal the end of stream data
1583 No streams in Node.js core are object mode streams. This pattern is only
1584 used by userland streaming libraries.
1586 You should set `objectMode` in your stream child class constructor on
1587 the options object. Setting `objectMode` mid-stream is not safe.
1589 For Duplex streams `objectMode` can be set exclusively for readable or
1590 writable side with `readableObjectMode` and `writableObjectMode`
1591 respectively. These options can be used to implement parsers and
1592 serializers with Transform streams.
1595 const util = require('util');
1596 const StringDecoder = require('string_decoder').StringDecoder;
1597 const Transform = require('stream').Transform;
1598 util.inherits(JSONParseStream, Transform);
1600 // Gets \n-delimited JSON string data, and emits the parsed objects
1601 function JSONParseStream() {
1602 if (!(this instanceof JSONParseStream))
1603 return new JSONParseStream();
1605 Transform.call(this, { readableObjectMode : true });
1608 this._decoder = new StringDecoder('utf8');
1611 JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
1612 this._buffer += this._decoder.write(chunk);
1613 // split on newlines
1614 var lines = this._buffer.split(/\r?\n/);
1615 // keep the last partial line buffered
1616 this._buffer = lines.pop();
1617 for (var l = 0; l < lines.length; l++) {
1618 var line = lines[l];
1620 var obj = JSON.parse(line);
1622 this.emit('error', er);
1625 // push the parsed object out to the readable consumer
1631 JSONParseStream.prototype._flush = function(cb) {
1632 // Just handle any leftover
1633 var rem = this._buffer.trim();
1636 var obj = JSON.parse(rem);
1638 this.emit('error', er);
1641 // push the parsed object out to the readable consumer
1648 ### `stream.read(0)`
1650 There are some cases where you want to trigger a refresh of the
1651 underlying readable stream mechanisms, without actually consuming any
1652 data. In that case, you can call `stream.read(0)`, which will always
1655 If the internal read buffer is below the `highWaterMark`, and the
1656 stream is not currently reading, then calling `read(0)` will trigger
1657 a low-level `_read` call.
1659 There is almost never a need to do this. However, you will see some
1660 cases in Node.js's internals where this is done, particularly in the
1661 Readable stream class internals.
1663 ### `stream.push('')`
1665 Pushing a zero-byte string or Buffer (when not in [Object mode][]) has an
1666 interesting side effect. Because it *is* a call to
1667 [`stream.push()`][], it will end the `reading` process. However, it
1668 does *not* add any data to the readable buffer, so there's nothing for
1671 Very rarely, there are cases where you have no data to provide now,
1672 but the consumer of your stream (or, perhaps, another bit of your own
1673 code) will know when to check again, by calling `stream.read(0)`. In
1674 those cases, you *may* call `stream.push('')`.
1676 So far, the only use case for this functionality is in the
1677 [`tls.CryptoStream`][] class, which is deprecated in Node.js/io.js v1.0. If you
1678 find that you have to use `stream.push('')`, please consider another
1679 approach, because it almost certainly indicates that something is
1682 [_read]: #stream_readable_read_size_1
1683 [_write]: #stream_writable_write_chunk_encoding_callback_1
1684 [`'data'`]: #stream_event_data
1685 [`'end'`]: #stream_event_end
1686 [`'finish'`]: #stream_event_finish
1687 [`_read()`]: #stream_readable_read_size_1
1688 [`_read(size)`]: #stream_readable_read_size_1
1689 [`_write()`]: #stream_writable_write_chunk_encoding_callback_1
1690 [`_write(chunk, encoding, callback)`]: #stream_writable_write_chunk_encoding_callback_1
1691 [`end()`]: #stream_writable_end_chunk_encoding_callback
1692 [`EventEmitter`]: events.html#events_class_events_eventemitter
1693 [`pause()`]: #stream_readable_pause
1694 [`pipe()`]: #stream_readable_pipe_destination_options
1695 [`process.stderr`]: process.html#process_process_stderr
1696 [`process.stdin`]: process.html#process_process_stdin
1697 [`process.stdout`]: process.html#process_process_stdout
1698 [`readable.resume()`]: #stream_readable_resume
1699 [`resume()`]: #stream_readable_resume
1700 [`stdout`]: process.html#process_process_stdout
1701 [`stream.push()`]: #stream_readable_push_chunk_encoding
1702 [`stream.push(chunk)`]: #stream_readable_push_chunk_encoding
1703 [`stream.push(null)`]: #stream_readable_push_chunk_encoding
1704 [`stream.write(chunk)`]: #stream_writable_write_chunk_encoding_callback
1705 [`tls.CryptoStream`]: tls.html#tls_class_cryptostream
1706 [`unpipe()`]: #stream_readable_unpipe_destination
1707 [`unpipe()`]: #stream_readable_unpipe_destination
1708 [`util.inherits`]: util.html#util_util_inherits_constructor_superconstructor
1709 [`writable.write(chunk)`]: #stream_writable_write_chunk_encoding_callback
1710 [`write()`]: #stream_writable_write_chunk_encoding_callback
1711 [`write(chunk, encoding, callback)`]: #stream_writable_write_chunk_encoding_callback
1712 [API for Stream Consumers]: #stream_api_for_stream_consumers
1713 [API for Stream Implementors]: #stream_api_for_stream_implementors
1714 [child process stdin]: child_process.html#child_process_child_stdin
1715 [child process stdout and stderr]: child_process.html#child_process_child_stdout
1716 [crypto streams]: crypto.html
1717 [crypto]: crypto.html
1718 [Duplex]: #stream_class_stream_duplex
1719 [fs read streams]: fs.html#fs_class_fs_readstream
1720 [fs write streams]: fs.html#fs_class_fs_writestream
1721 [http requests, on the client]: http.html#http_class_http_clientrequest
1722 [http requests, on the server]: http.html#http_http_incomingmessage
1723 [http responses, on the client]: http.html#http_http_incomingmessage
1724 [http responses, on the server]: http.html#http_class_http_serverresponse
1725 [Object mode]: #stream_object_mode
1726 [Readable]: #stream_class_stream_readable
1727 [request to an HTTP server]: http.html#http_http_incomingmessage
1728 [tcp sockets]: net.html#net_class_net_socket
1729 [Transform]: #stream_class_stream_transform
1730 [unpiped]: #stream_readable_unpipe_destination
1731 [Writable]: #stream_class_stream_writable
1732 [zlib streams]: zlib.html
1734 [_transform]: #stream_transform_transform_chunk_encoding_callback
1735 [`_transform()`]: #stream_transform_transform_chunk_encoding_callback
1736 [`_transform(chunk, encoding, callback)`]: #stream_transform_transform_chunk_encoding_callback
1737 [_flush]: #stream_transform_flush_callback
1738 [`_flush()`]: #stream_transform_flush_callback
1739 [`_flush(callback)`]: #stream_transform_flush_callback
1740 [_writev]: #stream_writable_writev_chunks_callback
1741 [`_writev()`]: #stream_writable_writev_chunks_callback
1742 [`_writev(chunks, callback)`]: #stream_writable_writev_chunks_callback