3 const EventEmitter = require('events');
4 const crypto = require('crypto');
5 const https = require('https');
6 const http = require('http');
7 const net = require('net');
8 const tls = require('tls');
9 const url = require('url');
11 const PerMessageDeflate = require('./permessage-deflate');
12 const EventTarget = require('./event-target');
13 const extension = require('./extension');
14 const constants = require('./constants');
15 const Receiver = require('./receiver');
16 const Sender = require('./sender');
18 const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
19 const kWebSocket = constants.kWebSocket;
20 const protocolVersions = [8, 13];
21 const closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly.
24 * Class representing a WebSocket.
26 * @extends EventEmitter
28 class WebSocket extends EventEmitter {
30 * Create a new `WebSocket`.
32 * @param {(String|url.Url|url.URL)} address The URL to which to connect
33 * @param {(String|String[])} protocols The subprotocols
34 * @param {Object} options Connection options
36 constructor (address, protocols, options) {
39 this.readyState = WebSocket.CONNECTING;
42 this._binaryType = constants.BINARY_TYPES[0];
43 this._closeFrameReceived = false;
44 this._closeFrameSent = false;
45 this._closeMessage = '';
46 this._closeTimer = null;
47 this._closeCode = 1006;
48 this._extensions = {};
49 this._isServer = true;
50 this._receiver = null;
54 if (address !== null) {
55 if (Array.isArray(protocols)) {
56 protocols = protocols.join(', ');
57 } else if (typeof protocols === 'object' && protocols !== null) {
59 protocols = undefined;
62 initAsClient.call(this, address, protocols, options);
66 get CONNECTING () { return WebSocket.CONNECTING; }
67 get CLOSING () { return WebSocket.CLOSING; }
68 get CLOSED () { return WebSocket.CLOSED; }
69 get OPEN () { return WebSocket.OPEN; }
72 * This deviates from the WHATWG interface since ws doesn't support the required
73 * default "blob" type (instead we define a custom "nodebuffer" type).
78 return this._binaryType;
81 set binaryType (type) {
82 if (constants.BINARY_TYPES.indexOf(type) < 0) return;
84 this._binaryType = type;
87 // Allow to change `binaryType` on the fly.
89 if (this._receiver) this._receiver._binaryType = type;
95 get bufferedAmount () {
96 if (!this._socket) return 0;
99 // `socket.bufferSize` is `undefined` if the socket is closed.
101 return (this._socket.bufferSize || 0) + this._sender._bufferedBytes;
108 return Object.keys(this._extensions).join();
112 * Set up the socket and the internal resources.
114 * @param {net.Socket} socket The network socket between the server and client
115 * @param {Buffer} head The first packet of the upgraded stream
116 * @param {Number} maxPayload The maximum allowed message size
119 setSocket (socket, head, maxPayload) {
120 const receiver = new Receiver(
126 this._sender = new Sender(socket, this._extensions);
127 this._receiver = receiver;
128 this._socket = socket;
130 receiver[kWebSocket] = this;
131 socket[kWebSocket] = this;
133 receiver.on('conclude', receiverOnConclude);
134 receiver.on('drain', receiverOnDrain);
135 receiver.on('error', receiverOnError);
136 receiver.on('message', receiverOnMessage);
137 receiver.on('ping', receiverOnPing);
138 receiver.on('pong', receiverOnPong);
140 socket.setTimeout(0);
143 if (head.length > 0) socket.unshift(head);
145 socket.on('close', socketOnClose);
146 socket.on('data', socketOnData);
147 socket.on('end', socketOnEnd);
148 socket.on('error', socketOnError);
150 this.readyState = WebSocket.OPEN;
155 * Emit the `'close'` event.
160 this.readyState = WebSocket.CLOSED;
163 this.emit('close', this._closeCode, this._closeMessage);
167 if (this._extensions[PerMessageDeflate.extensionName]) {
168 this._extensions[PerMessageDeflate.extensionName].cleanup();
171 this._receiver.removeAllListeners();
172 this.emit('close', this._closeCode, this._closeMessage);
176 * Start a closing handshake.
178 * +----------+ +-----------+ +----------+
179 * - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
180 * | +----------+ +-----------+ +----------+ |
181 * +----------+ +-----------+ |
182 * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING
183 * +----------+ +-----------+ |
185 * +------------------------+-->|fin| - - - -
187 * - - - - -|fin|<---------------------+
190 * @param {Number} code Status code explaining why the connection is closing
191 * @param {String} data A string explaining why the connection is closing
195 if (this.readyState === WebSocket.CLOSED) return;
196 if (this.readyState === WebSocket.CONNECTING) {
197 const msg = 'WebSocket was closed before the connection was established';
198 return abortHandshake(this, this._req, msg);
201 if (this.readyState === WebSocket.CLOSING) {
202 if (this._closeFrameSent && this._closeFrameReceived) this._socket.end();
206 this.readyState = WebSocket.CLOSING;
207 this._sender.close(code, data, !this._isServer, (err) => {
209 // This error is handled by the `'error'` listener on the socket. We only
210 // want to know if the close frame has been sent here.
214 this._closeFrameSent = true;
216 if (this._socket.writable) {
217 if (this._closeFrameReceived) this._socket.end();
220 // Ensure that the connection is closed even if the closing handshake
223 this._closeTimer = setTimeout(
224 this._socket.destroy.bind(this._socket),
234 * @param {*} data The data to send
235 * @param {Boolean} mask Indicates whether or not to mask `data`
236 * @param {Function} cb Callback which is executed when the ping is sent
239 ping (data, mask, cb) {
240 if (typeof data === 'function') {
242 data = mask = undefined;
243 } else if (typeof mask === 'function') {
248 if (this.readyState !== WebSocket.OPEN) {
249 const err = new Error(
250 `WebSocket is not open: readyState ${this.readyState} ` +
251 `(${readyStates[this.readyState]})`
254 if (cb) return cb(err);
258 if (typeof data === 'number') data = data.toString();
259 if (mask === undefined) mask = !this._isServer;
260 this._sender.ping(data || constants.EMPTY_BUFFER, mask, cb);
266 * @param {*} data The data to send
267 * @param {Boolean} mask Indicates whether or not to mask `data`
268 * @param {Function} cb Callback which is executed when the pong is sent
271 pong (data, mask, cb) {
272 if (typeof data === 'function') {
274 data = mask = undefined;
275 } else if (typeof mask === 'function') {
280 if (this.readyState !== WebSocket.OPEN) {
281 const err = new Error(
282 `WebSocket is not open: readyState ${this.readyState} ` +
283 `(${readyStates[this.readyState]})`
286 if (cb) return cb(err);
290 if (typeof data === 'number') data = data.toString();
291 if (mask === undefined) mask = !this._isServer;
292 this._sender.pong(data || constants.EMPTY_BUFFER, mask, cb);
296 * Send a data message.
298 * @param {*} data The message to send
299 * @param {Object} options Options object
300 * @param {Boolean} options.compress Specifies whether or not to compress `data`
301 * @param {Boolean} options.binary Specifies whether `data` is binary or text
302 * @param {Boolean} options.fin Specifies whether the fragment is the last one
303 * @param {Boolean} options.mask Specifies whether or not to mask `data`
304 * @param {Function} cb Callback which is executed when data is written out
307 send (data, options, cb) {
308 if (typeof options === 'function') {
313 if (this.readyState !== WebSocket.OPEN) {
314 const err = new Error(
315 `WebSocket is not open: readyState ${this.readyState} ` +
316 `(${readyStates[this.readyState]})`
319 if (cb) return cb(err);
323 if (typeof data === 'number') data = data.toString();
325 const opts = Object.assign({
326 binary: typeof data !== 'string',
327 mask: !this._isServer,
332 if (!this._extensions[PerMessageDeflate.extensionName]) {
333 opts.compress = false;
336 this._sender.send(data || constants.EMPTY_BUFFER, opts, cb);
340 * Forcibly close the connection.
345 if (this.readyState === WebSocket.CLOSED) return;
346 if (this.readyState === WebSocket.CONNECTING) {
347 const msg = 'WebSocket was closed before the connection was established';
348 return abortHandshake(this, this._req, msg);
352 this.readyState = WebSocket.CLOSING;
353 this._socket.destroy();
358 readyStates.forEach((readyState, i) => {
359 WebSocket[readyStates[i]] = i;
363 // Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes.
364 // See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface
366 ['open', 'error', 'close', 'message'].forEach((method) => {
367 Object.defineProperty(WebSocket.prototype, `on${method}`, {
369 * Return the listener of the event.
371 * @return {(Function|undefined)} The event listener or `undefined`
375 const listeners = this.listeners(method);
376 for (var i = 0; i < listeners.length; i++) {
377 if (listeners[i]._listener) return listeners[i]._listener;
381 * Add a listener for the event.
383 * @param {Function} listener The listener to add
387 const listeners = this.listeners(method);
388 for (var i = 0; i < listeners.length; i++) {
390 // Remove only the listeners added via `addEventListener`.
392 if (listeners[i]._listener) this.removeListener(method, listeners[i]);
394 this.addEventListener(method, listener);
399 WebSocket.prototype.addEventListener = EventTarget.addEventListener;
400 WebSocket.prototype.removeEventListener = EventTarget.removeEventListener;
402 module.exports = WebSocket;
405 * Initialize a WebSocket client.
407 * @param {(String|url.Url|url.URL)} address The URL to which to connect
408 * @param {String} protocols The subprotocols
409 * @param {Object} options Connection options
410 * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable permessage-deflate
411 * @param {Number} options.handshakeTimeout Timeout in milliseconds for the handshake request
412 * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version` header
413 * @param {String} options.origin Value of the `Origin` or `Sec-WebSocket-Origin` header
416 function initAsClient (address, protocols, options) {
417 options = Object.assign({
418 protocolVersion: protocolVersions[1],
419 perMessageDeflate: true
421 createConnection: undefined,
422 socketPath: undefined,
433 if (protocolVersions.indexOf(options.protocolVersion) === -1) {
434 throw new RangeError(
435 `Unsupported protocol version: ${options.protocolVersion} ` +
436 `(supported versions: ${protocolVersions.join(', ')})`
440 this._isServer = false;
444 if (typeof address === 'object' && address.href !== undefined) {
446 this.url = address.href;
448 parsedUrl = url.parse(address);
452 const isUnixSocket = parsedUrl.protocol === 'ws+unix:';
454 if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) {
455 throw new Error(`Invalid URL: ${this.url}`);
458 const isSecure = parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:';
459 const key = crypto.randomBytes(16).toString('base64');
460 const httpObj = isSecure ? https : http;
461 const path = parsedUrl.search
462 ? `${parsedUrl.pathname || '/'}${parsedUrl.search}`
463 : parsedUrl.pathname || '/';
464 var perMessageDeflate;
466 options.createConnection = isSecure ? tlsConnect : netConnect;
467 options.port = parsedUrl.port || (isSecure ? 443 : 80);
468 options.host = parsedUrl.hostname.startsWith('[')
469 ? parsedUrl.hostname.slice(1, -1)
470 : parsedUrl.hostname;
471 options.headers = Object.assign({
472 'Sec-WebSocket-Version': options.protocolVersion,
473 'Sec-WebSocket-Key': key,
474 'Connection': 'Upgrade',
475 'Upgrade': 'websocket'
479 if (options.perMessageDeflate) {
480 perMessageDeflate = new PerMessageDeflate(
481 options.perMessageDeflate !== true ? options.perMessageDeflate : {},
484 options.headers['Sec-WebSocket-Extensions'] = extension.format({
485 [PerMessageDeflate.extensionName]: perMessageDeflate.offer()
489 options.headers['Sec-WebSocket-Protocol'] = protocols;
491 if (options.origin) {
492 if (options.protocolVersion < 13) {
493 options.headers['Sec-WebSocket-Origin'] = options.origin;
495 options.headers.Origin = options.origin;
498 if (parsedUrl.auth) {
499 options.auth = parsedUrl.auth;
500 } else if (parsedUrl.username || parsedUrl.password) {
501 options.auth = `${parsedUrl.username}:${parsedUrl.password}`;
505 const parts = path.split(':');
507 if (options.agent == null && process.versions.modules < 57) {
509 // Setting `socketPath` in conjunction with `createConnection` without an
510 // agent throws an error on Node.js < 8. Work around the issue by using a
511 // different property.
513 options._socketPath = parts[0];
515 options.socketPath = parts[0];
518 options.path = parts[1];
521 var req = this._req = httpObj.get(options);
523 if (options.handshakeTimeout) {
525 options.handshakeTimeout,
526 () => abortHandshake(this, req, 'Opening handshake has timed out')
530 req.on('error', (err) => {
531 if (this._req.aborted) return;
533 req = this._req = null;
534 this.readyState = WebSocket.CLOSING;
535 this.emit('error', err);
539 req.on('response', (res) => {
540 if (this.emit('unexpected-response', req, res)) return;
542 abortHandshake(this, req, `Unexpected server response: ${res.statusCode}`);
545 req.on('upgrade', (res, socket, head) => {
546 this.emit('upgrade', res);
549 // The user may have closed the connection from a listener of the `upgrade`
552 if (this.readyState !== WebSocket.CONNECTING) return;
554 req = this._req = null;
556 const digest = crypto.createHash('sha1')
557 .update(key + constants.GUID, 'binary')
560 if (res.headers['sec-websocket-accept'] !== digest) {
561 abortHandshake(this, socket, 'Invalid Sec-WebSocket-Accept header');
565 const serverProt = res.headers['sec-websocket-protocol'];
566 const protList = (protocols || '').split(/, */);
569 if (!protocols && serverProt) {
570 protError = 'Server sent a subprotocol but none was requested';
571 } else if (protocols && !serverProt) {
572 protError = 'Server sent no subprotocol';
573 } else if (serverProt && protList.indexOf(serverProt) === -1) {
574 protError = 'Server sent an invalid subprotocol';
578 abortHandshake(this, socket, protError);
582 if (serverProt) this.protocol = serverProt;
584 if (perMessageDeflate) {
586 const extensions = extension.parse(
587 res.headers['sec-websocket-extensions']
590 if (extensions[PerMessageDeflate.extensionName]) {
591 perMessageDeflate.accept(
592 extensions[PerMessageDeflate.extensionName]
594 this._extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
597 abortHandshake(this, socket, 'Invalid Sec-WebSocket-Extensions header');
602 this.setSocket(socket, head, 0);
607 * Create a `net.Socket` and initiate a connection.
609 * @param {Object} options Connection options
610 * @return {net.Socket} The newly created socket used to start the connection
613 function netConnect (options) {
614 options.path = options.socketPath || options._socketPath || undefined;
615 return net.connect(options);
619 * Create a `tls.TLSSocket` and initiate a connection.
621 * @param {Object} options Connection options
622 * @return {tls.TLSSocket} The newly created socket used to start the connection
625 function tlsConnect (options) {
626 options.path = options.socketPath || options._socketPath || undefined;
627 options.servername = options.servername || options.host;
628 return tls.connect(options);
632 * Abort the handshake and emit an error.
634 * @param {WebSocket} websocket The WebSocket instance
635 * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the
637 * @param {String} message The error message
640 function abortHandshake (websocket, stream, message) {
641 websocket.readyState = WebSocket.CLOSING;
643 const err = new Error(message);
644 Error.captureStackTrace(err, abortHandshake);
646 if (stream.setHeader) {
648 stream.once('abort', websocket.emitClose.bind(websocket));
649 websocket.emit('error', err);
652 stream.once('error', websocket.emit.bind(websocket, 'error'));
653 stream.once('close', websocket.emitClose.bind(websocket));
658 * The listener of the `Receiver` `'conclude'` event.
660 * @param {Number} code The status code
661 * @param {String} reason The reason for closing
664 function receiverOnConclude (code, reason) {
665 const websocket = this[kWebSocket];
667 websocket._socket.removeListener('data', socketOnData);
668 websocket._socket.resume();
670 websocket._closeFrameReceived = true;
671 websocket._closeMessage = reason;
672 websocket._closeCode = code;
674 if (code === 1005) websocket.close();
675 else websocket.close(code, reason);
679 * The listener of the `Receiver` `'drain'` event.
683 function receiverOnDrain () {
684 this[kWebSocket]._socket.resume();
688 * The listener of the `Receiver` `'error'` event.
690 * @param {(RangeError|Error)} err The emitted error
693 function receiverOnError (err) {
694 const websocket = this[kWebSocket];
696 websocket._socket.removeListener('data', socketOnData);
698 websocket.readyState = WebSocket.CLOSING;
699 websocket._closeCode = err[constants.kStatusCode];
700 websocket.emit('error', err);
701 websocket._socket.destroy();
705 * The listener of the `Receiver` `'finish'` event.
709 function receiverOnFinish () {
710 this[kWebSocket].emitClose();
714 * The listener of the `Receiver` `'message'` event.
716 * @param {(String|Buffer|ArrayBuffer|Buffer[])} data The message
719 function receiverOnMessage (data) {
720 this[kWebSocket].emit('message', data);
724 * The listener of the `Receiver` `'ping'` event.
726 * @param {Buffer} data The data included in the ping frame
729 function receiverOnPing (data) {
730 const websocket = this[kWebSocket];
732 websocket.pong(data, !websocket._isServer, constants.NOOP);
733 websocket.emit('ping', data);
737 * The listener of the `Receiver` `'pong'` event.
739 * @param {Buffer} data The data included in the pong frame
742 function receiverOnPong (data) {
743 this[kWebSocket].emit('pong', data);
747 * The listener of the `net.Socket` `'close'` event.
751 function socketOnClose () {
752 const websocket = this[kWebSocket];
754 this.removeListener('close', socketOnClose);
755 this.removeListener('end', socketOnEnd);
757 websocket.readyState = WebSocket.CLOSING;
760 // The close frame might not have been received or the `'end'` event emitted,
761 // for example, if the socket was destroyed due to an error. Ensure that the
762 // `receiver` stream is closed after writing any remaining buffered data to
763 // it. If the readable side of the socket is in flowing mode then there is no
764 // buffered data as everything has been already written and `readable.read()`
765 // will return `null`. If instead, the socket is paused, any possible buffered
766 // data will be read as a single chunk and emitted synchronously in a single
769 websocket._socket.read();
770 websocket._receiver.end();
772 this.removeListener('data', socketOnData);
773 this[kWebSocket] = undefined;
775 clearTimeout(websocket._closeTimer);
778 websocket._receiver._writableState.finished ||
779 websocket._receiver._writableState.errorEmitted
781 websocket.emitClose();
783 websocket._receiver.on('error', receiverOnFinish);
784 websocket._receiver.on('finish', receiverOnFinish);
789 * The listener of the `net.Socket` `'data'` event.
791 * @param {Buffer} chunk A chunk of data
794 function socketOnData (chunk) {
795 if (!this[kWebSocket]._receiver.write(chunk)) {
801 * The listener of the `net.Socket` `'end'` event.
805 function socketOnEnd () {
806 const websocket = this[kWebSocket];
808 websocket.readyState = WebSocket.CLOSING;
809 websocket._receiver.end();
814 * The listener of the `net.Socket` `'error'` event.
818 function socketOnError () {
819 const websocket = this[kWebSocket];
821 this.removeListener('error', socketOnError);
822 this.on('error', constants.NOOP);
825 websocket.readyState = WebSocket.CLOSING;