3 const { Duplex } = require('stream');
6 * Emits the `'close'` event on a stream.
8 * @param {stream.Duplex} The stream.
11 function emitClose(stream) {
16 * The listener of the `'end'` event.
20 function duplexOnEnd() {
21 if (!this.destroyed && this._writableState.finished) {
27 * The listener of the `'error'` event.
29 * @param {Error} err The error
32 function duplexOnError(err) {
33 this.removeListener('error', duplexOnError);
35 if (this.listenerCount('error') === 0) {
36 // Do not suppress the throwing behavior.
37 this.emit('error', err);
42 * Wraps a `WebSocket` in a duplex stream.
44 * @param {WebSocket} ws The `WebSocket` to wrap
45 * @param {Object} [options] The options for the `Duplex` constructor
46 * @return {stream.Duplex} The duplex stream
49 function createWebSocketStream(ws, options) {
50 let resumeOnReceiverDrain = true;
52 function receiverOnDrain() {
53 if (resumeOnReceiverDrain) ws._socket.resume();
56 if (ws.readyState === ws.CONNECTING) {
57 ws.once('open', function open() {
58 ws._receiver.removeAllListeners('drain');
59 ws._receiver.on('drain', receiverOnDrain);
62 ws._receiver.removeAllListeners('drain');
63 ws._receiver.on('drain', receiverOnDrain);
66 const duplex = new Duplex({
71 writableObjectMode: false
74 ws.on('message', function message(msg) {
75 if (!duplex.push(msg)) {
76 resumeOnReceiverDrain = false;
81 ws.once('error', function error(err) {
82 if (duplex.destroyed) return;
87 ws.once('close', function close() {
88 if (duplex.destroyed) return;
93 duplex._destroy = function (err, callback) {
94 if (ws.readyState === ws.CLOSED) {
96 process.nextTick(emitClose, duplex);
102 ws.once('error', function error(err) {
107 ws.once('close', function close() {
108 if (!called) callback(err);
109 process.nextTick(emitClose, duplex);
114 duplex._final = function (callback) {
115 if (ws.readyState === ws.CONNECTING) {
116 ws.once('open', function open() {
117 duplex._final(callback);
122 // If the value of the `_socket` property is `null` it means that `ws` is a
123 // client websocket and the handshake failed. In fact, when this happens, a
124 // socket is never assigned to the websocket. Wait for the `'error'` event
125 // that will be emitted by the websocket.
126 if (ws._socket === null) return;
128 if (ws._socket._writableState.finished) {
130 if (duplex._readableState.endEmitted) duplex.destroy();
132 ws._socket.once('finish', function finish() {
133 // `duplex` is not destroyed here because the `'end'` event will be
134 // emitted on `duplex` after this `'finish'` event. The EOF signaling
135 // `null` chunk is, in fact, pushed when the websocket emits `'close'`.
142 duplex._read = function () {
143 if (ws.readyState === ws.OPEN && !resumeOnReceiverDrain) {
144 resumeOnReceiverDrain = true;
145 if (!ws._receiver._writableState.needDrain) ws._socket.resume();
149 duplex._write = function (chunk, encoding, callback) {
150 if (ws.readyState === ws.CONNECTING) {
151 ws.once('open', function open() {
152 duplex._write(chunk, encoding, callback);
157 ws.send(chunk, callback);
160 duplex.on('end', duplexOnEnd);
161 duplex.on('error', duplexOnError);
165 module.exports = createWebSocketStream;