3 module.exports = Stream;
5 const EE = require('events');
6 const util = require('util');
8 util.inherits(Stream, EE);
9 Stream.Readable = require('_stream_readable');
10 Stream.Writable = require('_stream_writable');
11 Stream.Duplex = require('_stream_duplex');
12 Stream.Transform = require('_stream_transform');
13 Stream.PassThrough = require('_stream_passthrough');
15 // Backwards-compat with node 0.4.x
16 Stream.Stream = Stream;
19 // old-style streams. Note that the pipe method (the only relevant
20 // part of this class) is overridden in the Readable class.
26 Stream.prototype.pipe = function(dest, options) {
29 function ondata(chunk) {
31 if (false === dest.write(chunk) && source.pause) {
37 source.on('data', ondata);
40 if (source.readable && source.resume) {
45 dest.on('drain', ondrain);
47 // If the 'end' option is not supplied, dest.end() will be called when
48 // source gets the 'end' or 'close' events. Only dest.end() once.
49 if (!dest._isStdio && (!options || options.end !== false)) {
50 source.on('end', onend);
51 source.on('close', onclose);
67 if (typeof dest.destroy === 'function') dest.destroy();
70 // don't leave dangling pipes when there are errors.
71 function onerror(er) {
73 if (EE.listenerCount(this, 'error') === 0) {
74 throw er; // Unhandled stream error in pipe.
78 source.on('error', onerror);
79 dest.on('error', onerror);
81 // remove all the event listeners that were added.
83 source.removeListener('data', ondata);
84 dest.removeListener('drain', ondrain);
86 source.removeListener('end', onend);
87 source.removeListener('close', onclose);
89 source.removeListener('error', onerror);
90 dest.removeListener('error', onerror);
92 source.removeListener('end', cleanup);
93 source.removeListener('close', cleanup);
95 dest.removeListener('close', cleanup);
98 source.on('end', cleanup);
99 source.on('close', cleanup);
101 dest.on('close', cleanup);
103 dest.emit('pipe', source);
105 // Allow for unix-like usage: A.pipe(B).pipe(C)