3 const zlib = require('zlib');
5 const bufferUtil = require('./buffer-util');
6 const Limiter = require('./limiter');
7 const { kStatusCode, NOOP } = require('./constants');
9 const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
10 const kPerMessageDeflate = Symbol('permessage-deflate');
11 const kTotalLength = Symbol('total-length');
12 const kCallback = Symbol('callback');
13 const kBuffers = Symbol('buffers');
14 const kError = Symbol('error');
17 // We limit zlib concurrency, which prevents severe memory fragmentation
18 // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
19 // and https://github.com/websockets/ws/issues/1202
21 // Intentionally global; it's the global thread pool that's an issue.
26 * permessage-deflate implementation.
28 class PerMessageDeflate {
30 * Creates a PerMessageDeflate instance.
32 * @param {Object} [options] Configuration options
33 * @param {Boolean} [options.serverNoContextTakeover=false] Request/accept
34 * disabling of server context takeover
35 * @param {Boolean} [options.clientNoContextTakeover=false] Advertise/
36 * acknowledge disabling of client context takeover
37 * @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the
38 * use of a custom server window size
39 * @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support
40 * for, or request, a custom client window size
41 * @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on
43 * @param {Object} [options.zlibInflateOptions] Options to pass to zlib on
45 * @param {Number} [options.threshold=1024] Size (in bytes) below which
46 * messages should not be compressed
47 * @param {Number} [options.concurrencyLimit=10] The number of concurrent
49 * @param {Boolean} [isServer=false] Create the instance in either server or
51 * @param {Number} [maxPayload=0] The maximum allowed message length
53 constructor(options, isServer, maxPayload) {
54 this._maxPayload = maxPayload | 0;
55 this._options = options || {};
57 this._options.threshold !== undefined ? this._options.threshold : 1024;
58 this._isServer = !!isServer;
66 this._options.concurrencyLimit !== undefined
67 ? this._options.concurrencyLimit
69 zlibLimiter = new Limiter(concurrency);
76 static get extensionName() {
77 return 'permessage-deflate';
81 * Create an extension negotiation offer.
83 * @return {Object} Extension parameters
89 if (this._options.serverNoContextTakeover) {
90 params.server_no_context_takeover = true;
92 if (this._options.clientNoContextTakeover) {
93 params.client_no_context_takeover = true;
95 if (this._options.serverMaxWindowBits) {
96 params.server_max_window_bits = this._options.serverMaxWindowBits;
98 if (this._options.clientMaxWindowBits) {
99 params.client_max_window_bits = this._options.clientMaxWindowBits;
100 } else if (this._options.clientMaxWindowBits == null) {
101 params.client_max_window_bits = true;
108 * Accept an extension negotiation offer/response.
110 * @param {Array} configurations The extension negotiation offers/reponse
111 * @return {Object} Accepted configuration
114 accept(configurations) {
115 configurations = this.normalizeParams(configurations);
117 this.params = this._isServer
118 ? this.acceptAsServer(configurations)
119 : this.acceptAsClient(configurations);
125 * Releases all resources used by the extension.
131 this._inflate.close();
132 this._inflate = null;
136 const callback = this._deflate[kCallback];
138 this._deflate.close();
139 this._deflate = null;
144 'The deflate stream was closed while data was being processed'
152 * Accept an extension negotiation offer.
154 * @param {Array} offers The extension negotiation offers
155 * @return {Object} Accepted configuration
158 acceptAsServer(offers) {
159 const opts = this._options;
160 const accepted = offers.find((params) => {
162 (opts.serverNoContextTakeover === false &&
163 params.server_no_context_takeover) ||
164 (params.server_max_window_bits &&
165 (opts.serverMaxWindowBits === false ||
166 (typeof opts.serverMaxWindowBits === 'number' &&
167 opts.serverMaxWindowBits > params.server_max_window_bits))) ||
168 (typeof opts.clientMaxWindowBits === 'number' &&
169 !params.client_max_window_bits)
178 throw new Error('None of the extension offers can be accepted');
181 if (opts.serverNoContextTakeover) {
182 accepted.server_no_context_takeover = true;
184 if (opts.clientNoContextTakeover) {
185 accepted.client_no_context_takeover = true;
187 if (typeof opts.serverMaxWindowBits === 'number') {
188 accepted.server_max_window_bits = opts.serverMaxWindowBits;
190 if (typeof opts.clientMaxWindowBits === 'number') {
191 accepted.client_max_window_bits = opts.clientMaxWindowBits;
193 accepted.client_max_window_bits === true ||
194 opts.clientMaxWindowBits === false
196 delete accepted.client_max_window_bits;
203 * Accept the extension negotiation response.
205 * @param {Array} response The extension negotiation response
206 * @return {Object} Accepted configuration
209 acceptAsClient(response) {
210 const params = response[0];
213 this._options.clientNoContextTakeover === false &&
214 params.client_no_context_takeover
216 throw new Error('Unexpected parameter "client_no_context_takeover"');
219 if (!params.client_max_window_bits) {
220 if (typeof this._options.clientMaxWindowBits === 'number') {
221 params.client_max_window_bits = this._options.clientMaxWindowBits;
224 this._options.clientMaxWindowBits === false ||
225 (typeof this._options.clientMaxWindowBits === 'number' &&
226 params.client_max_window_bits > this._options.clientMaxWindowBits)
229 'Unexpected or invalid parameter "client_max_window_bits"'
237 * Normalize parameters.
239 * @param {Array} configurations The extension negotiation offers/reponse
240 * @return {Array} The offers/response with normalized parameters
243 normalizeParams(configurations) {
244 configurations.forEach((params) => {
245 Object.keys(params).forEach((key) => {
246 let value = params[key];
248 if (value.length > 1) {
249 throw new Error(`Parameter "${key}" must have only a single value`);
254 if (key === 'client_max_window_bits') {
255 if (value !== true) {
257 if (!Number.isInteger(num) || num < 8 || num > 15) {
259 `Invalid value for parameter "${key}": ${value}`
263 } else if (!this._isServer) {
265 `Invalid value for parameter "${key}": ${value}`
268 } else if (key === 'server_max_window_bits') {
270 if (!Number.isInteger(num) || num < 8 || num > 15) {
272 `Invalid value for parameter "${key}": ${value}`
277 key === 'client_no_context_takeover' ||
278 key === 'server_no_context_takeover'
280 if (value !== true) {
282 `Invalid value for parameter "${key}": ${value}`
286 throw new Error(`Unknown parameter "${key}"`);
293 return configurations;
297 * Decompress data. Concurrency limited.
299 * @param {Buffer} data Compressed data
300 * @param {Boolean} fin Specifies whether or not this is the last fragment
301 * @param {Function} callback Callback
304 decompress(data, fin, callback) {
305 zlibLimiter.add((done) => {
306 this._decompress(data, fin, (err, result) => {
308 callback(err, result);
314 * Compress data. Concurrency limited.
316 * @param {Buffer} data Data to compress
317 * @param {Boolean} fin Specifies whether or not this is the last fragment
318 * @param {Function} callback Callback
321 compress(data, fin, callback) {
322 zlibLimiter.add((done) => {
323 this._compress(data, fin, (err, result) => {
325 callback(err, result);
333 * @param {Buffer} data Compressed data
334 * @param {Boolean} fin Specifies whether or not this is the last fragment
335 * @param {Function} callback Callback
338 _decompress(data, fin, callback) {
339 const endpoint = this._isServer ? 'client' : 'server';
341 if (!this._inflate) {
342 const key = `${endpoint}_max_window_bits`;
344 typeof this.params[key] !== 'number'
345 ? zlib.Z_DEFAULT_WINDOWBITS
348 this._inflate = zlib.createInflateRaw({
349 ...this._options.zlibInflateOptions,
352 this._inflate[kPerMessageDeflate] = this;
353 this._inflate[kTotalLength] = 0;
354 this._inflate[kBuffers] = [];
355 this._inflate.on('error', inflateOnError);
356 this._inflate.on('data', inflateOnData);
359 this._inflate[kCallback] = callback;
361 this._inflate.write(data);
362 if (fin) this._inflate.write(TRAILER);
364 this._inflate.flush(() => {
365 const err = this._inflate[kError];
368 this._inflate.close();
369 this._inflate = null;
374 const data = bufferUtil.concat(
375 this._inflate[kBuffers],
376 this._inflate[kTotalLength]
379 if (this._inflate._readableState.endEmitted) {
380 this._inflate.close();
381 this._inflate = null;
383 this._inflate[kTotalLength] = 0;
384 this._inflate[kBuffers] = [];
386 if (fin && this.params[`${endpoint}_no_context_takeover`]) {
387 this._inflate.reset();
391 callback(null, data);
398 * @param {Buffer} data Data to compress
399 * @param {Boolean} fin Specifies whether or not this is the last fragment
400 * @param {Function} callback Callback
403 _compress(data, fin, callback) {
404 const endpoint = this._isServer ? 'server' : 'client';
406 if (!this._deflate) {
407 const key = `${endpoint}_max_window_bits`;
409 typeof this.params[key] !== 'number'
410 ? zlib.Z_DEFAULT_WINDOWBITS
413 this._deflate = zlib.createDeflateRaw({
414 ...this._options.zlibDeflateOptions,
418 this._deflate[kTotalLength] = 0;
419 this._deflate[kBuffers] = [];
422 // An `'error'` event is emitted, only on Node.js < 10.0.0, if the
423 // `zlib.DeflateRaw` instance is closed while data is being processed.
424 // This can happen if `PerMessageDeflate#cleanup()` is called at the wrong
425 // time due to an abnormal WebSocket closure.
427 this._deflate.on('error', NOOP);
428 this._deflate.on('data', deflateOnData);
431 this._deflate[kCallback] = callback;
433 this._deflate.write(data);
434 this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
435 if (!this._deflate) {
437 // The deflate stream was closed while data was being processed.
442 let data = bufferUtil.concat(
443 this._deflate[kBuffers],
444 this._deflate[kTotalLength]
447 if (fin) data = data.slice(0, data.length - 4);
450 // Ensure that the callback will not be called again in
451 // `PerMessageDeflate#cleanup()`.
453 this._deflate[kCallback] = null;
455 this._deflate[kTotalLength] = 0;
456 this._deflate[kBuffers] = [];
458 if (fin && this.params[`${endpoint}_no_context_takeover`]) {
459 this._deflate.reset();
462 callback(null, data);
467 module.exports = PerMessageDeflate;
470 * The listener of the `zlib.DeflateRaw` stream `'data'` event.
472 * @param {Buffer} chunk A chunk of data
475 function deflateOnData(chunk) {
476 this[kBuffers].push(chunk);
477 this[kTotalLength] += chunk.length;
481 * The listener of the `zlib.InflateRaw` stream `'data'` event.
483 * @param {Buffer} chunk A chunk of data
486 function inflateOnData(chunk) {
487 this[kTotalLength] += chunk.length;
490 this[kPerMessageDeflate]._maxPayload < 1 ||
491 this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
493 this[kBuffers].push(chunk);
497 this[kError] = new RangeError('Max payload size exceeded');
498 this[kError][kStatusCode] = 1009;
499 this.removeListener('data', inflateOnData);
504 * The listener of the `zlib.InflateRaw` stream `'error'` event.
506 * @param {Error} err The emitted error
509 function inflateOnError(err) {
511 // There is no need to call `Zlib#close()` as the handle is automatically
512 // closed when an error is emitted.
514 this[kPerMessageDeflate]._inflate = null;
515 err[kStatusCode] = 1007;
516 this[kCallback](err);