3 const { Writable } = require('stream');
5 const PerMessageDeflate = require('./permessage-deflate');
11 } = require('./constants');
12 const { concat, toArrayBuffer, unmask } = require('./buffer-util');
13 const { isValidStatusCode, isValidUTF8 } = require('./validation');
16 const GET_PAYLOAD_LENGTH_16 = 1;
17 const GET_PAYLOAD_LENGTH_64 = 2;
23 * HyBi Receiver implementation.
25 * @extends stream.Writable
27 class Receiver extends Writable {
29 * Creates a Receiver instance.
31 * @param {String} [binaryType=nodebuffer] The type for binary data
32 * @param {Object} [extensions] An object containing the negotiated extensions
33 * @param {Boolean} [isServer=false] Specifies whether to operate in client or
35 * @param {Number} [maxPayload=0] The maximum allowed message length
37 constructor(binaryType, extensions, isServer, maxPayload) {
40 this._binaryType = binaryType || BINARY_TYPES[0];
41 this[kWebSocket] = undefined;
42 this._extensions = extensions || {};
43 this._isServer = !!isServer;
44 this._maxPayload = maxPayload | 0;
46 this._bufferedBytes = 0;
49 this._compressed = false;
50 this._payloadLength = 0;
51 this._mask = undefined;
57 this._totalPayloadLength = 0;
58 this._messageLength = 0;
61 this._state = GET_INFO;
66 * Implements `Writable.prototype._write()`.
68 * @param {Buffer} chunk The chunk of data to write
69 * @param {String} encoding The character encoding of `chunk`
70 * @param {Function} cb Callback
73 _write(chunk, encoding, cb) {
74 if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
76 this._bufferedBytes += chunk.length;
77 this._buffers.push(chunk);
82 * Consumes `n` bytes from the buffered data.
84 * @param {Number} n The number of bytes to consume
85 * @return {Buffer} The consumed bytes
89 this._bufferedBytes -= n;
91 if (n === this._buffers[0].length) return this._buffers.shift();
93 if (n < this._buffers[0].length) {
94 const buf = this._buffers[0];
95 this._buffers[0] = buf.slice(n);
96 return buf.slice(0, n);
99 const dst = Buffer.allocUnsafe(n);
102 const buf = this._buffers[0];
103 const offset = dst.length - n;
105 if (n >= buf.length) {
106 dst.set(this._buffers.shift(), offset);
108 dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
109 this._buffers[0] = buf.slice(n);
119 * Starts the parsing loop.
121 * @param {Function} cb Callback
129 switch (this._state) {
131 err = this.getInfo();
133 case GET_PAYLOAD_LENGTH_16:
134 err = this.getPayloadLength16();
136 case GET_PAYLOAD_LENGTH_64:
137 err = this.getPayloadLength64();
143 err = this.getData(cb);
150 } while (this._loop);
156 * Reads the first two bytes of a frame.
158 * @return {(RangeError|undefined)} A possible error
162 if (this._bufferedBytes < 2) {
167 const buf = this.consume(2);
169 if ((buf[0] & 0x30) !== 0x00) {
171 return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);
174 const compressed = (buf[0] & 0x40) === 0x40;
176 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
178 return error(RangeError, 'RSV1 must be clear', true, 1002);
181 this._fin = (buf[0] & 0x80) === 0x80;
182 this._opcode = buf[0] & 0x0f;
183 this._payloadLength = buf[1] & 0x7f;
185 if (this._opcode === 0x00) {
188 return error(RangeError, 'RSV1 must be clear', true, 1002);
191 if (!this._fragmented) {
193 return error(RangeError, 'invalid opcode 0', true, 1002);
196 this._opcode = this._fragmented;
197 } else if (this._opcode === 0x01 || this._opcode === 0x02) {
198 if (this._fragmented) {
200 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
203 this._compressed = compressed;
204 } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
207 return error(RangeError, 'FIN must be set', true, 1002);
212 return error(RangeError, 'RSV1 must be clear', true, 1002);
215 if (this._payloadLength > 0x7d) {
219 `invalid payload length ${this._payloadLength}`,
226 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
229 if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
230 this._masked = (buf[1] & 0x80) === 0x80;
232 if (this._isServer) {
235 return error(RangeError, 'MASK must be set', true, 1002);
237 } else if (this._masked) {
239 return error(RangeError, 'MASK must be clear', true, 1002);
242 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
243 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
244 else return this.haveLength();
248 * Gets extended payload length (7+16).
250 * @return {(RangeError|undefined)} A possible error
253 getPayloadLength16() {
254 if (this._bufferedBytes < 2) {
259 this._payloadLength = this.consume(2).readUInt16BE(0);
260 return this.haveLength();
264 * Gets extended payload length (7+64).
266 * @return {(RangeError|undefined)} A possible error
269 getPayloadLength64() {
270 if (this._bufferedBytes < 8) {
275 const buf = this.consume(8);
276 const num = buf.readUInt32BE(0);
279 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
280 // if payload length is greater than this number.
282 if (num > Math.pow(2, 53 - 32) - 1) {
286 'Unsupported WebSocket frame: payload length > 2^53 - 1',
292 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
293 return this.haveLength();
297 * Payload length has been read.
299 * @return {(RangeError|undefined)} A possible error
303 if (this._payloadLength && this._opcode < 0x08) {
304 this._totalPayloadLength += this._payloadLength;
305 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
307 return error(RangeError, 'Max payload size exceeded', false, 1009);
311 if (this._masked) this._state = GET_MASK;
312 else this._state = GET_DATA;
321 if (this._bufferedBytes < 4) {
326 this._mask = this.consume(4);
327 this._state = GET_DATA;
333 * @param {Function} cb Callback
334 * @return {(Error|RangeError|undefined)} A possible error
338 let data = EMPTY_BUFFER;
340 if (this._payloadLength) {
341 if (this._bufferedBytes < this._payloadLength) {
346 data = this.consume(this._payloadLength);
347 if (this._masked) unmask(data, this._mask);
350 if (this._opcode > 0x07) return this.controlMessage(data);
352 if (this._compressed) {
353 this._state = INFLATING;
354 this.decompress(data, cb);
360 // This message is not compressed so its lenght is the sum of the payload
361 // length of all fragments.
363 this._messageLength = this._totalPayloadLength;
364 this._fragments.push(data);
367 return this.dataMessage();
373 * @param {Buffer} data Compressed data
374 * @param {Function} cb Callback
377 decompress(data, cb) {
378 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
380 perMessageDeflate.decompress(data, this._fin, (err, buf) => {
381 if (err) return cb(err);
384 this._messageLength += buf.length;
385 if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
387 error(RangeError, 'Max payload size exceeded', false, 1009)
391 this._fragments.push(buf);
394 const er = this.dataMessage();
395 if (er) return cb(er);
402 * Handles a data message.
404 * @return {(Error|undefined)} A possible error
409 const messageLength = this._messageLength;
410 const fragments = this._fragments;
412 this._totalPayloadLength = 0;
413 this._messageLength = 0;
414 this._fragmented = 0;
415 this._fragments = [];
417 if (this._opcode === 2) {
420 if (this._binaryType === 'nodebuffer') {
421 data = concat(fragments, messageLength);
422 } else if (this._binaryType === 'arraybuffer') {
423 data = toArrayBuffer(concat(fragments, messageLength));
428 this.emit('message', data);
430 const buf = concat(fragments, messageLength);
432 if (!isValidUTF8(buf)) {
434 return error(Error, 'invalid UTF-8 sequence', true, 1007);
437 this.emit('message', buf.toString());
441 this._state = GET_INFO;
445 * Handles a control message.
447 * @param {Buffer} data Data to handle
448 * @return {(Error|RangeError|undefined)} A possible error
451 controlMessage(data) {
452 if (this._opcode === 0x08) {
455 if (data.length === 0) {
456 this.emit('conclude', 1005, '');
458 } else if (data.length === 1) {
459 return error(RangeError, 'invalid payload length 1', true, 1002);
461 const code = data.readUInt16BE(0);
463 if (!isValidStatusCode(code)) {
464 return error(RangeError, `invalid status code ${code}`, true, 1002);
467 const buf = data.slice(2);
469 if (!isValidUTF8(buf)) {
470 return error(Error, 'invalid UTF-8 sequence', true, 1007);
473 this.emit('conclude', code, buf.toString());
476 } else if (this._opcode === 0x09) {
477 this.emit('ping', data);
479 this.emit('pong', data);
482 this._state = GET_INFO;
486 module.exports = Receiver;
489 * Builds an error object.
491 * @param {(Error|RangeError)} ErrorCtor The error constructor
492 * @param {String} message The error message
493 * @param {Boolean} prefix Specifies whether or not to add a default prefix to
495 * @param {Number} statusCode The status code
496 * @return {(Error|RangeError)} The error
499 function error(ErrorCtor, message, prefix, statusCode) {
500 const err = new ErrorCtor(
501 prefix ? `Invalid WebSocket frame: ${message}` : message
504 Error.captureStackTrace(err, error);
505 err[kStatusCode] = statusCode;