3 const stream = require('stream');
5 const PerMessageDeflate = require('./permessage-deflate');
6 const bufferUtil = require('./buffer-util');
7 const validation = require('./validation');
8 const constants = require('./constants');
11 const GET_PAYLOAD_LENGTH_16 = 1;
12 const GET_PAYLOAD_LENGTH_64 = 2;
18 * HyBi Receiver implementation.
20 * @extends stream.Writable
22 class Receiver extends stream.Writable {
24 * Creates a Receiver instance.
26 * @param {String} binaryType The type for binary data
27 * @param {Object} extensions An object containing the negotiated extensions
28 * @param {Number} maxPayload The maximum allowed message length
30 constructor(binaryType, extensions, maxPayload) {
33 this._binaryType = binaryType || constants.BINARY_TYPES[0];
34 this[constants.kWebSocket] = undefined;
35 this._extensions = extensions || {};
36 this._maxPayload = maxPayload | 0;
38 this._bufferedBytes = 0;
41 this._compressed = false;
42 this._payloadLength = 0;
43 this._mask = undefined;
49 this._totalPayloadLength = 0;
50 this._messageLength = 0;
53 this._state = GET_INFO;
58 * Implements `Writable.prototype._write()`.
60 * @param {Buffer} chunk The chunk of data to write
61 * @param {String} encoding The character encoding of `chunk`
62 * @param {Function} cb Callback
64 _write(chunk, encoding, cb) {
65 if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
67 this._bufferedBytes += chunk.length;
68 this._buffers.push(chunk);
73 * Consumes `n` bytes from the buffered data.
75 * @param {Number} n The number of bytes to consume
76 * @return {Buffer} The consumed bytes
80 this._bufferedBytes -= n;
82 if (n === this._buffers[0].length) return this._buffers.shift();
84 if (n < this._buffers[0].length) {
85 const buf = this._buffers[0];
86 this._buffers[0] = buf.slice(n);
87 return buf.slice(0, n);
90 const dst = Buffer.allocUnsafe(n);
93 const buf = this._buffers[0];
95 if (n >= buf.length) {
96 this._buffers.shift().copy(dst, dst.length - n);
98 buf.copy(dst, dst.length - n, 0, n);
99 this._buffers[0] = buf.slice(n);
109 * Starts the parsing loop.
111 * @param {Function} cb Callback
119 switch (this._state) {
121 err = this.getInfo();
123 case GET_PAYLOAD_LENGTH_16:
124 err = this.getPayloadLength16();
126 case GET_PAYLOAD_LENGTH_64:
127 err = this.getPayloadLength64();
133 err = this.getData(cb);
140 } while (this._loop);
146 * Reads the first two bytes of a frame.
148 * @return {(RangeError|undefined)} A possible error
152 if (this._bufferedBytes < 2) {
157 const buf = this.consume(2);
159 if ((buf[0] & 0x30) !== 0x00) {
161 return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);
164 const compressed = (buf[0] & 0x40) === 0x40;
166 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
168 return error(RangeError, 'RSV1 must be clear', true, 1002);
171 this._fin = (buf[0] & 0x80) === 0x80;
172 this._opcode = buf[0] & 0x0f;
173 this._payloadLength = buf[1] & 0x7f;
175 if (this._opcode === 0x00) {
178 return error(RangeError, 'RSV1 must be clear', true, 1002);
181 if (!this._fragmented) {
183 return error(RangeError, 'invalid opcode 0', true, 1002);
186 this._opcode = this._fragmented;
187 } else if (this._opcode === 0x01 || this._opcode === 0x02) {
188 if (this._fragmented) {
190 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
193 this._compressed = compressed;
194 } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
197 return error(RangeError, 'FIN must be set', true, 1002);
202 return error(RangeError, 'RSV1 must be clear', true, 1002);
205 if (this._payloadLength > 0x7d) {
209 `invalid payload length ${this._payloadLength}`,
216 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
219 if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
220 this._masked = (buf[1] & 0x80) === 0x80;
222 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
223 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
224 else return this.haveLength();
228 * Gets extended payload length (7+16).
230 * @return {(RangeError|undefined)} A possible error
233 getPayloadLength16() {
234 if (this._bufferedBytes < 2) {
239 this._payloadLength = this.consume(2).readUInt16BE(0);
240 return this.haveLength();
244 * Gets extended payload length (7+64).
246 * @return {(RangeError|undefined)} A possible error
249 getPayloadLength64() {
250 if (this._bufferedBytes < 8) {
255 const buf = this.consume(8);
256 const num = buf.readUInt32BE(0);
259 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
260 // if payload length is greater than this number.
262 if (num > Math.pow(2, 53 - 32) - 1) {
266 'Unsupported WebSocket frame: payload length > 2^53 - 1',
272 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
273 return this.haveLength();
277 * Payload length has been read.
279 * @return {(RangeError|undefined)} A possible error
283 if (this._payloadLength && this._opcode < 0x08) {
284 this._totalPayloadLength += this._payloadLength;
285 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
287 return error(RangeError, 'Max payload size exceeded', false, 1009);
291 if (this._masked) this._state = GET_MASK;
292 else this._state = GET_DATA;
301 if (this._bufferedBytes < 4) {
306 this._mask = this.consume(4);
307 this._state = GET_DATA;
313 * @param {Function} cb Callback
314 * @return {(Error|RangeError|undefined)} A possible error
318 var data = constants.EMPTY_BUFFER;
320 if (this._payloadLength) {
321 if (this._bufferedBytes < this._payloadLength) {
326 data = this.consume(this._payloadLength);
327 if (this._masked) bufferUtil.unmask(data, this._mask);
330 if (this._opcode > 0x07) return this.controlMessage(data);
332 if (this._compressed) {
333 this._state = INFLATING;
334 this.decompress(data, cb);
340 // This message is not compressed so its lenght is the sum of the payload
341 // length of all fragments.
343 this._messageLength = this._totalPayloadLength;
344 this._fragments.push(data);
347 return this.dataMessage();
353 * @param {Buffer} data Compressed data
354 * @param {Function} cb Callback
357 decompress(data, cb) {
358 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
360 perMessageDeflate.decompress(data, this._fin, (err, buf) => {
361 if (err) return cb(err);
364 this._messageLength += buf.length;
365 if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
367 error(RangeError, 'Max payload size exceeded', false, 1009)
371 this._fragments.push(buf);
374 const er = this.dataMessage();
375 if (er) return cb(er);
382 * Handles a data message.
384 * @return {(Error|undefined)} A possible error
389 const messageLength = this._messageLength;
390 const fragments = this._fragments;
392 this._totalPayloadLength = 0;
393 this._messageLength = 0;
394 this._fragmented = 0;
395 this._fragments = [];
397 if (this._opcode === 2) {
400 if (this._binaryType === 'nodebuffer') {
401 data = toBuffer(fragments, messageLength);
402 } else if (this._binaryType === 'arraybuffer') {
403 data = toArrayBuffer(toBuffer(fragments, messageLength));
408 this.emit('message', data);
410 const buf = toBuffer(fragments, messageLength);
412 if (!validation.isValidUTF8(buf)) {
414 return error(Error, 'invalid UTF-8 sequence', true, 1007);
417 this.emit('message', buf.toString());
421 this._state = GET_INFO;
425 * Handles a control message.
427 * @param {Buffer} data Data to handle
428 * @return {(Error|RangeError|undefined)} A possible error
431 controlMessage(data) {
432 if (this._opcode === 0x08) {
435 if (data.length === 0) {
436 this.emit('conclude', 1005, '');
438 } else if (data.length === 1) {
439 return error(RangeError, 'invalid payload length 1', true, 1002);
441 const code = data.readUInt16BE(0);
443 if (!validation.isValidStatusCode(code)) {
444 return error(RangeError, `invalid status code ${code}`, true, 1002);
447 const buf = data.slice(2);
449 if (!validation.isValidUTF8(buf)) {
450 return error(Error, 'invalid UTF-8 sequence', true, 1007);
453 this.emit('conclude', code, buf.toString());
456 } else if (this._opcode === 0x09) {
457 this.emit('ping', data);
459 this.emit('pong', data);
462 this._state = GET_INFO;
466 module.exports = Receiver;
469 * Builds an error object.
471 * @param {(Error|RangeError)} ErrorCtor The error constructor
472 * @param {String} message The error message
473 * @param {Boolean} prefix Specifies whether or not to add a default prefix to
475 * @param {Number} statusCode The status code
476 * @return {(Error|RangeError)} The error
479 function error(ErrorCtor, message, prefix, statusCode) {
480 const err = new ErrorCtor(
481 prefix ? `Invalid WebSocket frame: ${message}` : message
484 Error.captureStackTrace(err, error);
485 err[constants.kStatusCode] = statusCode;
490 * Makes a buffer from a list of fragments.
492 * @param {Buffer[]} fragments The list of fragments composing the message
493 * @param {Number} messageLength The length of the message
497 function toBuffer(fragments, messageLength) {
498 if (fragments.length === 1) return fragments[0];
499 if (fragments.length > 1) return bufferUtil.concat(fragments, messageLength);
500 return constants.EMPTY_BUFFER;
504 * Converts a buffer to an `ArrayBuffer`.
506 * @param {Buffer} buf The buffer to convert
507 * @return {ArrayBuffer} Converted buffer
509 function toArrayBuffer(buf) {
510 if (buf.byteLength === buf.buffer.byteLength) {
514 return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);