3 const { randomFillSync } = require('crypto');
5 const PerMessageDeflate = require('./permessage-deflate');
6 const { EMPTY_BUFFER } = require('./constants');
7 const { isValidStatusCode } = require('./validation');
8 const { mask: applyMask, toBuffer } = require('./buffer-util');
10 const mask = Buffer.alloc(4);
13 * HyBi Sender implementation.
17 * Creates a Sender instance.
19 * @param {net.Socket} socket The connection socket
20 * @param {Object} [extensions] An object containing the negotiated extensions
22 constructor(socket, extensions) {
23 this._extensions = extensions || {};
24 this._socket = socket;
26 this._firstFragment = true;
27 this._compress = false;
29 this._bufferedBytes = 0;
30 this._deflating = false;
35 * Frames a piece of data according to the HyBi WebSocket protocol.
37 * @param {Buffer} data The data to frame
38 * @param {Object} options Options object
39 * @param {Number} options.opcode The opcode
40 * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
42 * @param {Boolean} [options.fin=false] Specifies whether or not to set the
44 * @param {Boolean} [options.mask=false] Specifies whether or not to mask
46 * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
48 * @return {Buffer[]} The framed data as a list of `Buffer` instances
51 static frame(data, options) {
52 const merge = options.mask && options.readOnly;
53 let offset = options.mask ? 6 : 2;
54 let payloadLength = data.length;
56 if (data.length >= 65536) {
59 } else if (data.length > 125) {
64 const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
66 target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
67 if (options.rsv1) target[0] |= 0x40;
69 target[1] = payloadLength;
71 if (payloadLength === 126) {
72 target.writeUInt16BE(data.length, 2);
73 } else if (payloadLength === 127) {
74 target.writeUInt32BE(0, 2);
75 target.writeUInt32BE(data.length, 6);
78 if (!options.mask) return [target, data];
80 randomFillSync(mask, 0, 4);
83 target[offset - 4] = mask[0];
84 target[offset - 3] = mask[1];
85 target[offset - 2] = mask[2];
86 target[offset - 1] = mask[3];
89 applyMask(data, mask, target, offset, data.length);
93 applyMask(data, mask, data, 0, data.length);
94 return [target, data];
98 * Sends a close message to the other peer.
100 * @param {Number} [code] The status code component of the body
101 * @param {String} [data] The message component of the body
102 * @param {Boolean} [mask=false] Specifies whether or not to mask the message
103 * @param {Function} [cb] Callback
106 close(code, data, mask, cb) {
109 if (code === undefined) {
111 } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
112 throw new TypeError('First argument must be a valid error code number');
113 } else if (data === undefined || data === '') {
114 buf = Buffer.allocUnsafe(2);
115 buf.writeUInt16BE(code, 0);
117 const length = Buffer.byteLength(data);
120 throw new RangeError('The message must not be greater than 123 bytes');
123 buf = Buffer.allocUnsafe(2 + length);
124 buf.writeUInt16BE(code, 0);
128 if (this._deflating) {
129 this.enqueue([this.doClose, buf, mask, cb]);
131 this.doClose(buf, mask, cb);
136 * Frames and sends a close message.
138 * @param {Buffer} data The message to send
139 * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
140 * @param {Function} [cb] Callback
143 doClose(data, mask, cb) {
157 * Sends a ping message to the other peer.
159 * @param {*} data The message to send
160 * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
161 * @param {Function} [cb] Callback
164 ping(data, mask, cb) {
165 const buf = toBuffer(data);
167 if (buf.length > 125) {
168 throw new RangeError('The data size must not be greater than 125 bytes');
171 if (this._deflating) {
172 this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]);
174 this.doPing(buf, mask, toBuffer.readOnly, cb);
179 * Frames and sends a ping message.
181 * @param {Buffer} data The message to send
182 * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
183 * @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
184 * @param {Function} [cb] Callback
187 doPing(data, mask, readOnly, cb) {
201 * Sends a pong message to the other peer.
203 * @param {*} data The message to send
204 * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
205 * @param {Function} [cb] Callback
208 pong(data, mask, cb) {
209 const buf = toBuffer(data);
211 if (buf.length > 125) {
212 throw new RangeError('The data size must not be greater than 125 bytes');
215 if (this._deflating) {
216 this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]);
218 this.doPong(buf, mask, toBuffer.readOnly, cb);
223 * Frames and sends a pong message.
225 * @param {Buffer} data The message to send
226 * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
227 * @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
228 * @param {Function} [cb] Callback
231 doPong(data, mask, readOnly, cb) {
245 * Sends a data message to the other peer.
247 * @param {*} data The message to send
248 * @param {Object} options Options object
249 * @param {Boolean} [options.compress=false] Specifies whether or not to
251 * @param {Boolean} [options.binary=false] Specifies whether `data` is binary
253 * @param {Boolean} [options.fin=false] Specifies whether the fragment is the
255 * @param {Boolean} [options.mask=false] Specifies whether or not to mask
257 * @param {Function} [cb] Callback
260 send(data, options, cb) {
261 const buf = toBuffer(data);
262 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
263 let opcode = options.binary ? 2 : 1;
264 let rsv1 = options.compress;
266 if (this._firstFragment) {
267 this._firstFragment = false;
268 if (rsv1 && perMessageDeflate) {
269 rsv1 = buf.length >= perMessageDeflate._threshold;
271 this._compress = rsv1;
277 if (options.fin) this._firstFragment = true;
279 if (perMessageDeflate) {
285 readOnly: toBuffer.readOnly
288 if (this._deflating) {
289 this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
291 this.dispatch(buf, this._compress, opts, cb);
300 readOnly: toBuffer.readOnly
308 * Dispatches a data message.
310 * @param {Buffer} data The message to send
311 * @param {Boolean} [compress=false] Specifies whether or not to compress
313 * @param {Object} options Options object
314 * @param {Number} options.opcode The opcode
315 * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
317 * @param {Boolean} [options.fin=false] Specifies whether or not to set the
319 * @param {Boolean} [options.mask=false] Specifies whether or not to mask
321 * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
323 * @param {Function} [cb] Callback
326 dispatch(data, compress, options, cb) {
328 this.sendFrame(Sender.frame(data, options), cb);
332 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
334 this._bufferedBytes += data.length;
335 this._deflating = true;
336 perMessageDeflate.compress(data, options.fin, (_, buf) => {
337 if (this._socket.destroyed) {
338 const err = new Error(
339 'The socket was closed while data was being compressed'
342 if (typeof cb === 'function') cb(err);
344 for (let i = 0; i < this._queue.length; i++) {
345 const callback = this._queue[i][4];
347 if (typeof callback === 'function') callback(err);
353 this._bufferedBytes -= data.length;
354 this._deflating = false;
355 options.readOnly = false;
356 this.sendFrame(Sender.frame(buf, options), cb);
362 * Executes queued send operations.
367 while (!this._deflating && this._queue.length) {
368 const params = this._queue.shift();
370 this._bufferedBytes -= params[1].length;
371 Reflect.apply(params[0], this, params.slice(1));
376 * Enqueues a send operation.
378 * @param {Array} params Send operation parameters.
382 this._bufferedBytes += params[1].length;
383 this._queue.push(params);
389 * @param {Buffer[]} list The frame to send
390 * @param {Function} [cb] Callback
393 sendFrame(list, cb) {
394 if (list.length === 2) {
396 this._socket.write(list[0]);
397 this._socket.write(list[1], cb);
398 this._socket.uncork();
400 this._socket.write(list[0], cb);
405 module.exports = Sender;