5 var EventEmitter = require('events').EventEmitter;
6 var util = require('util');
7 var debug = require('debug')('engine:socket');
13 module.exports = Socket;
16 * Client class (abstract).
21 function Socket (id, server, transport, req) {
24 this.upgrading = false;
25 this.upgraded = false;
26 this.readyState = 'opening';
27 this.writeBuffer = [];
29 this.sentCallbackFn = [];
33 // Cache IP since it might not be in the req later
34 if (req.websocket && req.websocket._socket) {
35 this.remoteAddress = req.websocket._socket.remoteAddress;
37 this.remoteAddress = req.connection.remoteAddress;
40 this.checkIntervalTimer = null;
41 this.upgradeTimeoutTimer = null;
42 this.pingTimeoutTimer = null;
44 this.setTransport(transport);
49 * Inherits from EventEmitter.
52 util.inherits(Socket, EventEmitter);
55 * Called upon transport considered open.
60 Socket.prototype.onOpen = function () {
61 this.readyState = 'open';
63 // sends an `open` packet
64 this.transport.sid = this.id;
65 this.sendPacket('open', JSON.stringify({
67 upgrades: this.getAvailableUpgrades(),
68 pingInterval: this.server.pingInterval,
69 pingTimeout: this.server.pingTimeout
72 if (this.server.initialPacket) {
73 this.sendPacket('message', this.server.initialPacket);
77 this.setPingTimeout();
81 * Called upon transport packet.
83 * @param {Object} packet
87 Socket.prototype.onPacket = function (packet) {
88 if ('open' === this.readyState) {
89 // export packet event
91 this.emit('packet', packet);
93 // Reset ping timeout on any packet, incoming data is a good sign of
94 // other side's liveness
95 this.setPingTimeout();
97 switch (packet.type) {
100 this.sendPacket('pong');
101 this.emit('heartbeat');
105 this.onClose('parse error');
109 this.emit('data', packet.data);
110 this.emit('message', packet.data);
114 debug('packet received with closed socket');
119 * Called upon transport error.
121 * @param {Error} error object
125 Socket.prototype.onError = function (err) {
126 debug('transport error');
127 this.onClose('transport error', err);
131 * Sets and resets ping timeout timer based on client pings.
136 Socket.prototype.setPingTimeout = function () {
138 clearTimeout(self.pingTimeoutTimer);
139 self.pingTimeoutTimer = setTimeout(function () {
140 self.onClose('ping timeout');
141 }, self.server.pingInterval + self.server.pingTimeout);
145 * Attaches handlers for the given transport.
147 * @param {Transport} transport
151 Socket.prototype.setTransport = function (transport) {
152 var onError = this.onError.bind(this);
153 var onPacket = this.onPacket.bind(this);
154 var flush = this.flush.bind(this);
155 var onClose = this.onClose.bind(this, 'transport close');
157 this.transport = transport;
158 this.transport.once('error', onError);
159 this.transport.on('packet', onPacket);
160 this.transport.on('drain', flush);
161 this.transport.once('close', onClose);
162 // this function will manage packet events (also message callbacks)
163 this.setupSendCallback();
165 this.cleanupFn.push(function () {
166 transport.removeListener('error', onError);
167 transport.removeListener('packet', onPacket);
168 transport.removeListener('drain', flush);
169 transport.removeListener('close', onClose);
174 * Upgrades socket to the given transport
176 * @param {Transport} transport
180 Socket.prototype.maybeUpgrade = function (transport) {
181 debug('might upgrade socket transport from "%s" to "%s"'
182 , this.transport.name, transport.name);
184 this.upgrading = true;
188 // set transport upgrade timer
189 self.upgradeTimeoutTimer = setTimeout(function () {
190 debug('client did not complete upgrade - closing transport');
192 if ('open' === transport.readyState) {
195 }, this.server.upgradeTimeout);
197 function onPacket (packet) {
198 if ('ping' === packet.type && 'probe' === packet.data) {
199 transport.send([{ type: 'pong', data: 'probe' }]);
200 self.emit('upgrading', transport);
201 clearInterval(self.checkIntervalTimer);
202 self.checkIntervalTimer = setInterval(check, 100);
203 } else if ('upgrade' === packet.type && self.readyState !== 'closed') {
204 debug('got upgrade packet - upgrading');
206 self.transport.discard();
207 self.upgraded = true;
208 self.clearTransport();
209 self.setTransport(transport);
210 self.emit('upgrade', transport);
211 self.setPingTimeout();
213 if (self.readyState === 'closing') {
214 transport.close(function () {
215 self.onClose('forced close');
224 // we force a polling cycle to ensure a fast upgrade
226 if ('polling' === self.transport.name && self.transport.writable) {
227 debug('writing a noop packet to polling for fast upgrade');
228 self.transport.send([{ type: 'noop' }]);
232 function cleanup () {
233 self.upgrading = false;
235 clearInterval(self.checkIntervalTimer);
236 self.checkIntervalTimer = null;
238 clearTimeout(self.upgradeTimeoutTimer);
239 self.upgradeTimeoutTimer = null;
241 transport.removeListener('packet', onPacket);
242 transport.removeListener('close', onTransportClose);
243 transport.removeListener('error', onError);
244 self.removeListener('close', onClose);
247 function onError (err) {
248 debug('client did not complete upgrade - %s', err);
254 function onTransportClose () {
255 onError('transport closed');
258 function onClose () {
259 onError('socket closed');
262 transport.on('packet', onPacket);
263 transport.once('close', onTransportClose);
264 transport.once('error', onError);
266 self.once('close', onClose);
270 * Clears listeners and timers associated with current transport.
275 Socket.prototype.clearTransport = function () {
278 var toCleanUp = this.cleanupFn.length;
280 for (var i = 0; i < toCleanUp; i++) {
281 cleanup = this.cleanupFn.shift();
285 // silence further transport errors and prevent uncaught exceptions
286 this.transport.on('error', function () {
287 debug('error triggered by discarded transport');
290 // ensure transport won't stay open
291 this.transport.close();
293 clearTimeout(this.pingTimeoutTimer);
297 * Called upon transport considered closed.
298 * Possible reasons: `ping timeout`, `client error`, `parse error`,
299 * `transport error`, `server close`, `transport close`
302 Socket.prototype.onClose = function (reason, description) {
303 if ('closed' !== this.readyState) {
304 this.readyState = 'closed';
305 clearTimeout(this.pingTimeoutTimer);
306 clearInterval(this.checkIntervalTimer);
307 this.checkIntervalTimer = null;
308 clearTimeout(this.upgradeTimeoutTimer);
310 // clean writeBuffer in next tick, so developers can still
311 // grab the writeBuffer on 'close' event
312 process.nextTick(function () {
313 self.writeBuffer = [];
316 this.sentCallbackFn = [];
317 this.clearTransport();
318 this.emit('close', reason, description);
323 * Setup and manage send callback
328 Socket.prototype.setupSendCallback = function () {
330 this.transport.on('drain', onDrain);
332 this.cleanupFn.push(function () {
333 self.transport.removeListener('drain', onDrain);
336 // the message was sent successfully, execute the callback
337 function onDrain () {
338 if (self.sentCallbackFn.length > 0) {
339 var seqFn = self.sentCallbackFn.splice(0, 1)[0];
340 if ('function' === typeof seqFn) {
341 debug('executing send callback');
342 seqFn(self.transport);
343 } else if (Array.isArray(seqFn)) {
344 debug('executing batch send callback');
345 for (var l = seqFn.length, i = 0; i < l; i++) {
346 if ('function' === typeof seqFn[i]) {
347 seqFn[i](self.transport);
356 * Sends a message packet.
358 * @param {String} message
359 * @param {Object} options
360 * @param {Function} callback
361 * @return {Socket} for chaining
365 Socket.prototype.send =
366 Socket.prototype.write = function (data, options, callback) {
367 this.sendPacket('message', data, options, callback);
374 * @param {String} packet type
375 * @param {String} optional, data
376 * @param {Object} options
380 Socket.prototype.sendPacket = function (type, data, options, callback) {
381 if ('function' === typeof options) {
386 options = options || {};
387 options.compress = false !== options.compress;
389 if ('closing' !== this.readyState && 'closed' !== this.readyState) {
390 debug('sending packet "%s" (%s)', type, data);
396 if (data) packet.data = data;
398 // exports packetCreate event
399 this.emit('packetCreate', packet);
401 this.writeBuffer.push(packet);
403 // add send callback to object, if defined
404 if (callback) this.packetsFn.push(callback);
411 * Attempts to flush the packets buffer.
416 Socket.prototype.flush = function () {
417 if ('closed' !== this.readyState &&
418 this.transport.writable &&
419 this.writeBuffer.length) {
420 debug('flushing buffer to transport');
421 this.emit('flush', this.writeBuffer);
422 this.server.emit('flush', this, this.writeBuffer);
423 var wbuf = this.writeBuffer;
424 this.writeBuffer = [];
425 if (!this.transport.supportsFraming) {
426 this.sentCallbackFn.push(this.packetsFn);
428 this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);
431 this.transport.send(wbuf);
433 this.server.emit('drain', this);
438 * Get available upgrades for this socket.
443 Socket.prototype.getAvailableUpgrades = function () {
444 var availableUpgrades = [];
445 var allUpgrades = this.server.upgrades(this.transport.name);
446 for (var i = 0, l = allUpgrades.length; i < l; ++i) {
447 var upg = allUpgrades[i];
448 if (this.server.transports.indexOf(upg) !== -1) {
449 availableUpgrades.push(upg);
452 return availableUpgrades;
456 * Closes the socket and underlying transport.
458 * @param {Boolean} optional, discard
459 * @return {Socket} for chaining
463 Socket.prototype.close = function (discard) {
464 if ('open' !== this.readyState) return;
466 this.readyState = 'closing';
468 if (this.writeBuffer.length) {
469 this.once('drain', this.closeTransport.bind(this, discard));
473 this.closeTransport(discard);
477 * Closes the underlying transport.
479 * @param {Boolean} discard
483 Socket.prototype.closeTransport = function (discard) {
484 if (discard) this.transport.discard();
485 this.transport.close(this.onClose.bind(this, 'forced close'));