[SignalingServer] Optimize dependent modules
[platform/framework/web/wrtjs.git] / signaling_server / service / node_modules / engine.io / lib / socket.js
1 /**
2  * Module dependencies.
3  */
4
5 var EventEmitter = require('events').EventEmitter;
6 var util = require('util');
7 var debug = require('debug')('engine:socket');
8
9 /**
10  * Module exports.
11  */
12
13 module.exports = Socket;
14
15 /**
16  * Client class (abstract).
17  *
18  * @api private
19  */
20
21 function Socket (id, server, transport, req) {
22   this.id = id;
23   this.server = server;
24   this.upgrading = false;
25   this.upgraded = false;
26   this.readyState = 'opening';
27   this.writeBuffer = [];
28   this.packetsFn = [];
29   this.sentCallbackFn = [];
30   this.cleanupFn = [];
31   this.request = req;
32
33   // Cache IP since it might not be in the req later
34   if (req.websocket && req.websocket._socket) {
35     this.remoteAddress = req.websocket._socket.remoteAddress;
36   } else {
37     this.remoteAddress = req.connection.remoteAddress;
38   }
39
40   this.checkIntervalTimer = null;
41   this.upgradeTimeoutTimer = null;
42   this.pingTimeoutTimer = null;
43
44   this.setTransport(transport);
45   this.onOpen();
46 }
47
48 /**
49  * Inherits from EventEmitter.
50  */
51
52 util.inherits(Socket, EventEmitter);
53
54 /**
55  * Called upon transport considered open.
56  *
57  * @api private
58  */
59
60 Socket.prototype.onOpen = function () {
61   this.readyState = 'open';
62
63   // sends an `open` packet
64   this.transport.sid = this.id;
65   this.sendPacket('open', JSON.stringify({
66     sid: this.id,
67     upgrades: this.getAvailableUpgrades(),
68     pingInterval: this.server.pingInterval,
69     pingTimeout: this.server.pingTimeout
70   }));
71
72   if (this.server.initialPacket) {
73     this.sendPacket('message', this.server.initialPacket);
74   }
75
76   this.emit('open');
77   this.setPingTimeout();
78 };
79
80 /**
81  * Called upon transport packet.
82  *
83  * @param {Object} packet
84  * @api private
85  */
86
87 Socket.prototype.onPacket = function (packet) {
88   if ('open' === this.readyState) {
89     // export packet event
90     debug('packet');
91     this.emit('packet', packet);
92
93     // Reset ping timeout on any packet, incoming data is a good sign of
94     // other side's liveness
95     this.setPingTimeout();
96
97     switch (packet.type) {
98       case 'ping':
99         debug('got ping');
100         this.sendPacket('pong');
101         this.emit('heartbeat');
102         break;
103
104       case 'error':
105         this.onClose('parse error');
106         break;
107
108       case 'message':
109         this.emit('data', packet.data);
110         this.emit('message', packet.data);
111         break;
112     }
113   } else {
114     debug('packet received with closed socket');
115   }
116 };
117
118 /**
119  * Called upon transport error.
120  *
121  * @param {Error} error object
122  * @api private
123  */
124
125 Socket.prototype.onError = function (err) {
126   debug('transport error');
127   this.onClose('transport error', err);
128 };
129
130 /**
131  * Sets and resets ping timeout timer based on client pings.
132  *
133  * @api private
134  */
135
136 Socket.prototype.setPingTimeout = function () {
137   var self = this;
138   clearTimeout(self.pingTimeoutTimer);
139   self.pingTimeoutTimer = setTimeout(function () {
140     self.onClose('ping timeout');
141   }, self.server.pingInterval + self.server.pingTimeout);
142 };
143
144 /**
145  * Attaches handlers for the given transport.
146  *
147  * @param {Transport} transport
148  * @api private
149  */
150
151 Socket.prototype.setTransport = function (transport) {
152   var onError = this.onError.bind(this);
153   var onPacket = this.onPacket.bind(this);
154   var flush = this.flush.bind(this);
155   var onClose = this.onClose.bind(this, 'transport close');
156
157   this.transport = transport;
158   this.transport.once('error', onError);
159   this.transport.on('packet', onPacket);
160   this.transport.on('drain', flush);
161   this.transport.once('close', onClose);
162   // this function will manage packet events (also message callbacks)
163   this.setupSendCallback();
164
165   this.cleanupFn.push(function () {
166     transport.removeListener('error', onError);
167     transport.removeListener('packet', onPacket);
168     transport.removeListener('drain', flush);
169     transport.removeListener('close', onClose);
170   });
171 };
172
173 /**
174  * Upgrades socket to the given transport
175  *
176  * @param {Transport} transport
177  * @api private
178  */
179
180 Socket.prototype.maybeUpgrade = function (transport) {
181   debug('might upgrade socket transport from "%s" to "%s"'
182     , this.transport.name, transport.name);
183
184   this.upgrading = true;
185
186   var self = this;
187
188   // set transport upgrade timer
189   self.upgradeTimeoutTimer = setTimeout(function () {
190     debug('client did not complete upgrade - closing transport');
191     cleanup();
192     if ('open' === transport.readyState) {
193       transport.close();
194     }
195   }, this.server.upgradeTimeout);
196
197   function onPacket (packet) {
198     if ('ping' === packet.type && 'probe' === packet.data) {
199       transport.send([{ type: 'pong', data: 'probe' }]);
200       self.emit('upgrading', transport);
201       clearInterval(self.checkIntervalTimer);
202       self.checkIntervalTimer = setInterval(check, 100);
203     } else if ('upgrade' === packet.type && self.readyState !== 'closed') {
204       debug('got upgrade packet - upgrading');
205       cleanup();
206       self.transport.discard();
207       self.upgraded = true;
208       self.clearTransport();
209       self.setTransport(transport);
210       self.emit('upgrade', transport);
211       self.setPingTimeout();
212       self.flush();
213       if (self.readyState === 'closing') {
214         transport.close(function () {
215           self.onClose('forced close');
216         });
217       }
218     } else {
219       cleanup();
220       transport.close();
221     }
222   }
223
224   // we force a polling cycle to ensure a fast upgrade
225   function check () {
226     if ('polling' === self.transport.name && self.transport.writable) {
227       debug('writing a noop packet to polling for fast upgrade');
228       self.transport.send([{ type: 'noop' }]);
229     }
230   }
231
232   function cleanup () {
233     self.upgrading = false;
234
235     clearInterval(self.checkIntervalTimer);
236     self.checkIntervalTimer = null;
237
238     clearTimeout(self.upgradeTimeoutTimer);
239     self.upgradeTimeoutTimer = null;
240
241     transport.removeListener('packet', onPacket);
242     transport.removeListener('close', onTransportClose);
243     transport.removeListener('error', onError);
244     self.removeListener('close', onClose);
245   }
246
247   function onError (err) {
248     debug('client did not complete upgrade - %s', err);
249     cleanup();
250     transport.close();
251     transport = null;
252   }
253
254   function onTransportClose () {
255     onError('transport closed');
256   }
257
258   function onClose () {
259     onError('socket closed');
260   }
261
262   transport.on('packet', onPacket);
263   transport.once('close', onTransportClose);
264   transport.once('error', onError);
265
266   self.once('close', onClose);
267 };
268
269 /**
270  * Clears listeners and timers associated with current transport.
271  *
272  * @api private
273  */
274
275 Socket.prototype.clearTransport = function () {
276   var cleanup;
277
278   var toCleanUp = this.cleanupFn.length;
279
280   for (var i = 0; i < toCleanUp; i++) {
281     cleanup = this.cleanupFn.shift();
282     cleanup();
283   }
284
285   // silence further transport errors and prevent uncaught exceptions
286   this.transport.on('error', function () {
287     debug('error triggered by discarded transport');
288   });
289
290   // ensure transport won't stay open
291   this.transport.close();
292
293   clearTimeout(this.pingTimeoutTimer);
294 };
295
296 /**
297  * Called upon transport considered closed.
298  * Possible reasons: `ping timeout`, `client error`, `parse error`,
299  * `transport error`, `server close`, `transport close`
300  */
301
302 Socket.prototype.onClose = function (reason, description) {
303   if ('closed' !== this.readyState) {
304     this.readyState = 'closed';
305     clearTimeout(this.pingTimeoutTimer);
306     clearInterval(this.checkIntervalTimer);
307     this.checkIntervalTimer = null;
308     clearTimeout(this.upgradeTimeoutTimer);
309     var self = this;
310     // clean writeBuffer in next tick, so developers can still
311     // grab the writeBuffer on 'close' event
312     process.nextTick(function () {
313       self.writeBuffer = [];
314     });
315     this.packetsFn = [];
316     this.sentCallbackFn = [];
317     this.clearTransport();
318     this.emit('close', reason, description);
319   }
320 };
321
322 /**
323  * Setup and manage send callback
324  *
325  * @api private
326  */
327
328 Socket.prototype.setupSendCallback = function () {
329   var self = this;
330   this.transport.on('drain', onDrain);
331
332   this.cleanupFn.push(function () {
333     self.transport.removeListener('drain', onDrain);
334   });
335
336   // the message was sent successfully, execute the callback
337   function onDrain () {
338     if (self.sentCallbackFn.length > 0) {
339       var seqFn = self.sentCallbackFn.splice(0, 1)[0];
340       if ('function' === typeof seqFn) {
341         debug('executing send callback');
342         seqFn(self.transport);
343       } else if (Array.isArray(seqFn)) {
344         debug('executing batch send callback');
345         for (var l = seqFn.length, i = 0; i < l; i++) {
346           if ('function' === typeof seqFn[i]) {
347             seqFn[i](self.transport);
348           }
349         }
350       }
351     }
352   }
353 };
354
355 /**
356  * Sends a message packet.
357  *
358  * @param {String} message
359  * @param {Object} options
360  * @param {Function} callback
361  * @return {Socket} for chaining
362  * @api public
363  */
364
365 Socket.prototype.send =
366 Socket.prototype.write = function (data, options, callback) {
367   this.sendPacket('message', data, options, callback);
368   return this;
369 };
370
371 /**
372  * Sends a packet.
373  *
374  * @param {String} packet type
375  * @param {String} optional, data
376  * @param {Object} options
377  * @api private
378  */
379
380 Socket.prototype.sendPacket = function (type, data, options, callback) {
381   if ('function' === typeof options) {
382     callback = options;
383     options = null;
384   }
385
386   options = options || {};
387   options.compress = false !== options.compress;
388
389   if ('closing' !== this.readyState && 'closed' !== this.readyState) {
390     debug('sending packet "%s" (%s)', type, data);
391
392     var packet = {
393       type: type,
394       options: options
395     };
396     if (data) packet.data = data;
397
398     // exports packetCreate event
399     this.emit('packetCreate', packet);
400
401     this.writeBuffer.push(packet);
402
403     // add send callback to object, if defined
404     if (callback) this.packetsFn.push(callback);
405
406     this.flush();
407   }
408 };
409
410 /**
411  * Attempts to flush the packets buffer.
412  *
413  * @api private
414  */
415
416 Socket.prototype.flush = function () {
417   if ('closed' !== this.readyState &&
418                 this.transport.writable &&
419                 this.writeBuffer.length) {
420     debug('flushing buffer to transport');
421     this.emit('flush', this.writeBuffer);
422     this.server.emit('flush', this, this.writeBuffer);
423     var wbuf = this.writeBuffer;
424     this.writeBuffer = [];
425     if (!this.transport.supportsFraming) {
426       this.sentCallbackFn.push(this.packetsFn);
427     } else {
428       this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);
429     }
430     this.packetsFn = [];
431     this.transport.send(wbuf);
432     this.emit('drain');
433     this.server.emit('drain', this);
434   }
435 };
436
437 /**
438  * Get available upgrades for this socket.
439  *
440  * @api private
441  */
442
443 Socket.prototype.getAvailableUpgrades = function () {
444   var availableUpgrades = [];
445   var allUpgrades = this.server.upgrades(this.transport.name);
446   for (var i = 0, l = allUpgrades.length; i < l; ++i) {
447     var upg = allUpgrades[i];
448     if (this.server.transports.indexOf(upg) !== -1) {
449       availableUpgrades.push(upg);
450     }
451   }
452   return availableUpgrades;
453 };
454
455 /**
456  * Closes the socket and underlying transport.
457  *
458  * @param {Boolean} optional, discard
459  * @return {Socket} for chaining
460  * @api public
461  */
462
463 Socket.prototype.close = function (discard) {
464   if ('open' !== this.readyState) return;
465
466   this.readyState = 'closing';
467
468   if (this.writeBuffer.length) {
469     this.once('drain', this.closeTransport.bind(this, discard));
470     return;
471   }
472
473   this.closeTransport(discard);
474 };
475
476 /**
477  * Closes the underlying transport.
478  *
479  * @param {Boolean} discard
480  * @api private
481  */
482
483 Socket.prototype.closeTransport = function (discard) {
484   if (discard) this.transport.discard();
485   this.transport.close(this.onClose.bind(this, 'forced close'));
486 };