5 var transports = require('./transports/index');
6 var Emitter = require('component-emitter');
7 var debug = require('debug')('engine.io-client:socket');
8 var index = require('indexof');
9 var parser = require('engine.io-parser');
10 var parseuri = require('parseuri');
11 var parseqs = require('parseqs');
17 module.exports = Socket;
22 * @param {String|Object} uri or options
23 * @param {Object} options
27 function Socket (uri, opts) {
28 if (!(this instanceof Socket)) return new Socket(uri, opts);
32 if (uri && 'object' === typeof uri) {
39 opts.hostname = uri.host;
40 opts.secure = uri.protocol === 'https' || uri.protocol === 'wss';
42 if (uri.query) opts.query = uri.query;
43 } else if (opts.host) {
44 opts.hostname = parseuri(opts.host).host;
47 this.secure = null != opts.secure ? opts.secure
48 : (typeof location !== 'undefined' && 'https:' === location.protocol);
50 if (opts.hostname && !opts.port) {
51 // if no port is specified manually, use the protocol default
52 opts.port = this.secure ? '443' : '80';
55 this.agent = opts.agent || false;
56 this.hostname = opts.hostname ||
57 (typeof location !== 'undefined' ? location.hostname : 'localhost');
58 this.port = opts.port || (typeof location !== 'undefined' && location.port
60 : (this.secure ? 443 : 80));
61 this.query = opts.query || {};
62 if ('string' === typeof this.query) this.query = parseqs.decode(this.query);
63 this.upgrade = false !== opts.upgrade;
64 this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
65 this.forceJSONP = !!opts.forceJSONP;
66 this.jsonp = false !== opts.jsonp;
67 this.forceBase64 = !!opts.forceBase64;
68 this.enablesXDR = !!opts.enablesXDR;
69 this.withCredentials = false !== opts.withCredentials;
70 this.timestampParam = opts.timestampParam || 't';
71 this.timestampRequests = opts.timestampRequests;
72 this.transports = opts.transports || ['polling', 'websocket'];
73 this.transportOptions = opts.transportOptions || {};
75 this.writeBuffer = [];
76 this.prevBufferLen = 0;
77 this.policyPort = opts.policyPort || 843;
78 this.rememberUpgrade = opts.rememberUpgrade || false;
79 this.binaryType = null;
80 this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
81 this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false;
83 if (true === this.perMessageDeflate) this.perMessageDeflate = {};
84 if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) {
85 this.perMessageDeflate.threshold = 1024;
88 // SSL options for Node.js client
89 this.pfx = opts.pfx || undefined;
90 this.key = opts.key || undefined;
91 this.passphrase = opts.passphrase || undefined;
92 this.cert = opts.cert || undefined;
93 this.ca = opts.ca || undefined;
94 this.ciphers = opts.ciphers || undefined;
95 this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized;
96 this.forceNode = !!opts.forceNode;
98 // detect ReactNative environment
99 this.isReactNative = (typeof navigator !== 'undefined' && typeof navigator.product === 'string' && navigator.product.toLowerCase() === 'reactnative');
101 // other options for Node.js or ReactNative client
102 if (typeof self === 'undefined' || this.isReactNative) {
103 if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) {
104 this.extraHeaders = opts.extraHeaders;
107 if (opts.localAddress) {
108 this.localAddress = opts.localAddress;
114 this.upgrades = null;
115 this.pingInterval = null;
116 this.pingTimeout = null;
119 this.pingIntervalTimer = null;
120 this.pingTimeoutTimer = null;
125 Socket.priorWebsocketSuccess = false;
131 Emitter(Socket.prototype);
139 Socket.protocol = parser.protocol; // this is an int
142 * Expose deps for legacy compatibility
143 * and standalone browser access.
146 Socket.Socket = Socket;
147 Socket.Transport = require('./transport');
148 Socket.transports = require('./transports/index');
149 Socket.parser = require('engine.io-parser');
152 * Creates transport of the given type.
154 * @param {String} transport name
155 * @return {Transport}
159 Socket.prototype.createTransport = function (name) {
160 debug('creating transport "%s"', name);
161 var query = clone(this.query);
163 // append engine.io protocol identifier
164 query.EIO = parser.protocol;
167 query.transport = name;
169 // per-transport options
170 var options = this.transportOptions[name] || {};
172 // session id if we already have one
173 if (this.id) query.sid = this.id;
175 var transport = new transports[name]({
178 agent: options.agent || this.agent,
179 hostname: options.hostname || this.hostname,
180 port: options.port || this.port,
181 secure: options.secure || this.secure,
182 path: options.path || this.path,
183 forceJSONP: options.forceJSONP || this.forceJSONP,
184 jsonp: options.jsonp || this.jsonp,
185 forceBase64: options.forceBase64 || this.forceBase64,
186 enablesXDR: options.enablesXDR || this.enablesXDR,
187 withCredentials: options.withCredentials || this.withCredentials,
188 timestampRequests: options.timestampRequests || this.timestampRequests,
189 timestampParam: options.timestampParam || this.timestampParam,
190 policyPort: options.policyPort || this.policyPort,
191 pfx: options.pfx || this.pfx,
192 key: options.key || this.key,
193 passphrase: options.passphrase || this.passphrase,
194 cert: options.cert || this.cert,
195 ca: options.ca || this.ca,
196 ciphers: options.ciphers || this.ciphers,
197 rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized,
198 perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate,
199 extraHeaders: options.extraHeaders || this.extraHeaders,
200 forceNode: options.forceNode || this.forceNode,
201 localAddress: options.localAddress || this.localAddress,
202 requestTimeout: options.requestTimeout || this.requestTimeout,
203 protocols: options.protocols || void (0),
204 isReactNative: this.isReactNative
210 function clone (obj) {
213 if (obj.hasOwnProperty(i)) {
221 * Initializes transport to use and starts probe.
225 Socket.prototype.open = function () {
227 if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) {
228 transport = 'websocket';
229 } else if (0 === this.transports.length) {
230 // Emit error on next tick so it can be listened to
232 setTimeout(function () {
233 self.emit('error', 'No transports available');
237 transport = this.transports[0];
239 this.readyState = 'opening';
241 // Retry with the next transport if the transport is disabled (jsonp: false)
243 transport = this.createTransport(transport);
245 this.transports.shift();
251 this.setTransport(transport);
255 * Sets the current transport. Disables the existing one (if any).
260 Socket.prototype.setTransport = function (transport) {
261 debug('setting transport %s', transport.name);
264 if (this.transport) {
265 debug('clearing existing transport %s', this.transport.name);
266 this.transport.removeAllListeners();
270 this.transport = transport;
272 // set up transport listeners
274 .on('drain', function () {
277 .on('packet', function (packet) {
278 self.onPacket(packet);
280 .on('error', function (e) {
283 .on('close', function () {
284 self.onClose('transport close');
289 * Probes a transport.
291 * @param {String} transport name
295 Socket.prototype.probe = function (name) {
296 debug('probing transport "%s"', name);
297 var transport = this.createTransport(name, { probe: 1 });
301 Socket.priorWebsocketSuccess = false;
303 function onTransportOpen () {
304 if (self.onlyBinaryUpgrades) {
305 var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
306 failed = failed || upgradeLosesBinary;
310 debug('probe transport "%s" opened', name);
311 transport.send([{ type: 'ping', data: 'probe' }]);
312 transport.once('packet', function (msg) {
314 if ('pong' === msg.type && 'probe' === msg.data) {
315 debug('probe transport "%s" pong', name);
316 self.upgrading = true;
317 self.emit('upgrading', transport);
318 if (!transport) return;
319 Socket.priorWebsocketSuccess = 'websocket' === transport.name;
321 debug('pausing current transport "%s"', self.transport.name);
322 self.transport.pause(function () {
324 if ('closed' === self.readyState) return;
325 debug('changing transport and sending upgrade packet');
329 self.setTransport(transport);
330 transport.send([{ type: 'upgrade' }]);
331 self.emit('upgrade', transport);
333 self.upgrading = false;
337 debug('probe transport "%s" failed', name);
338 var err = new Error('probe error');
339 err.transport = transport.name;
340 self.emit('upgradeError', err);
345 function freezeTransport () {
348 // Any callback called by transport should be ignored since now
357 // Handle any error that happens while probing
358 function onerror (err) {
359 var error = new Error('probe error: ' + err);
360 error.transport = transport.name;
364 debug('probe transport "%s" failed because of error: %s', name, err);
366 self.emit('upgradeError', error);
369 function onTransportClose () {
370 onerror('transport closed');
373 // When the socket is closed while we're probing
374 function onclose () {
375 onerror('socket closed');
378 // When the socket is upgraded while we're probing
379 function onupgrade (to) {
380 if (transport && to.name !== transport.name) {
381 debug('"%s" works - aborting "%s"', to.name, transport.name);
386 // Remove all listeners on the transport and on self
387 function cleanup () {
388 transport.removeListener('open', onTransportOpen);
389 transport.removeListener('error', onerror);
390 transport.removeListener('close', onTransportClose);
391 self.removeListener('close', onclose);
392 self.removeListener('upgrading', onupgrade);
395 transport.once('open', onTransportOpen);
396 transport.once('error', onerror);
397 transport.once('close', onTransportClose);
399 this.once('close', onclose);
400 this.once('upgrading', onupgrade);
406 * Called when connection is deemed open.
411 Socket.prototype.onOpen = function () {
412 debug('socket open');
413 this.readyState = 'open';
414 Socket.priorWebsocketSuccess = 'websocket' === this.transport.name;
418 // we check for `readyState` in case an `open`
419 // listener already closed the socket
420 if ('open' === this.readyState && this.upgrade && this.transport.pause) {
421 debug('starting upgrade probes');
422 for (var i = 0, l = this.upgrades.length; i < l; i++) {
423 this.probe(this.upgrades[i]);
434 Socket.prototype.onPacket = function (packet) {
435 if ('opening' === this.readyState || 'open' === this.readyState ||
436 'closing' === this.readyState) {
437 debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
439 this.emit('packet', packet);
441 // Socket is live - any packet counts
442 this.emit('heartbeat');
444 switch (packet.type) {
446 this.onHandshake(JSON.parse(packet.data));
455 var err = new Error('server error');
456 err.code = packet.data;
461 this.emit('data', packet.data);
462 this.emit('message', packet.data);
466 debug('packet received with socket readyState "%s"', this.readyState);
471 * Called upon handshake completion.
473 * @param {Object} handshake obj
477 Socket.prototype.onHandshake = function (data) {
478 this.emit('handshake', data);
480 this.transport.query.sid = data.sid;
481 this.upgrades = this.filterUpgrades(data.upgrades);
482 this.pingInterval = data.pingInterval;
483 this.pingTimeout = data.pingTimeout;
485 // In case open handler closes socket
486 if ('closed' === this.readyState) return;
489 // Prolong liveness of socket on heartbeat
490 this.removeListener('heartbeat', this.onHeartbeat);
491 this.on('heartbeat', this.onHeartbeat);
495 * Resets ping timeout.
500 Socket.prototype.onHeartbeat = function (timeout) {
501 clearTimeout(this.pingTimeoutTimer);
503 self.pingTimeoutTimer = setTimeout(function () {
504 if ('closed' === self.readyState) return;
505 self.onClose('ping timeout');
506 }, timeout || (self.pingInterval + self.pingTimeout));
510 * Pings server every `this.pingInterval` and expects response
511 * within `this.pingTimeout` or closes connection.
516 Socket.prototype.setPing = function () {
518 clearTimeout(self.pingIntervalTimer);
519 self.pingIntervalTimer = setTimeout(function () {
520 debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
522 self.onHeartbeat(self.pingTimeout);
523 }, self.pingInterval);
527 * Sends a ping packet.
532 Socket.prototype.ping = function () {
534 this.sendPacket('ping', function () {
540 * Called on `drain` event
545 Socket.prototype.onDrain = function () {
546 this.writeBuffer.splice(0, this.prevBufferLen);
548 // setting prevBufferLen = 0 is very important
549 // for example, when upgrading, upgrade packet is sent over,
550 // and a nonzero prevBufferLen could cause problems on `drain`
551 this.prevBufferLen = 0;
553 if (0 === this.writeBuffer.length) {
561 * Flush write buffers.
566 Socket.prototype.flush = function () {
567 if ('closed' !== this.readyState && this.transport.writable &&
568 !this.upgrading && this.writeBuffer.length) {
569 debug('flushing %d packets in socket', this.writeBuffer.length);
570 this.transport.send(this.writeBuffer);
571 // keep track of current length of writeBuffer
572 // splice writeBuffer and callbackBuffer on `drain`
573 this.prevBufferLen = this.writeBuffer.length;
581 * @param {String} message.
582 * @param {Function} callback function.
583 * @param {Object} options.
584 * @return {Socket} for chaining.
588 Socket.prototype.write =
589 Socket.prototype.send = function (msg, options, fn) {
590 this.sendPacket('message', msg, options, fn);
597 * @param {String} packet type.
598 * @param {String} data.
599 * @param {Object} options.
600 * @param {Function} callback function.
604 Socket.prototype.sendPacket = function (type, data, options, fn) {
605 if ('function' === typeof data) {
610 if ('function' === typeof options) {
615 if ('closing' === this.readyState || 'closed' === this.readyState) {
619 options = options || {};
620 options.compress = false !== options.compress;
627 this.emit('packetCreate', packet);
628 this.writeBuffer.push(packet);
629 if (fn) this.once('flush', fn);
634 * Closes the connection.
639 Socket.prototype.close = function () {
640 if ('opening' === this.readyState || 'open' === this.readyState) {
641 this.readyState = 'closing';
645 if (this.writeBuffer.length) {
646 this.once('drain', function () {
647 if (this.upgrading) {
653 } else if (this.upgrading) {
661 self.onClose('forced close');
662 debug('socket closing - telling transport to close');
663 self.transport.close();
666 function cleanupAndClose () {
667 self.removeListener('upgrade', cleanupAndClose);
668 self.removeListener('upgradeError', cleanupAndClose);
672 function waitForUpgrade () {
673 // wait for upgrade to finish since we can't send packets while pausing a transport
674 self.once('upgrade', cleanupAndClose);
675 self.once('upgradeError', cleanupAndClose);
682 * Called upon transport error
687 Socket.prototype.onError = function (err) {
688 debug('socket error %j', err);
689 Socket.priorWebsocketSuccess = false;
690 this.emit('error', err);
691 this.onClose('transport error', err);
695 * Called upon transport close.
700 Socket.prototype.onClose = function (reason, desc) {
701 if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) {
702 debug('socket close with reason: "%s"', reason);
706 clearTimeout(this.pingIntervalTimer);
707 clearTimeout(this.pingTimeoutTimer);
709 // stop event from firing again for transport
710 this.transport.removeAllListeners('close');
712 // ensure transport won't stay open
713 this.transport.close();
715 // ignore further transport communication
716 this.transport.removeAllListeners();
719 this.readyState = 'closed';
725 this.emit('close', reason, desc);
727 // clean buffers after, so users can still
728 // grab the buffers on `close` event
729 self.writeBuffer = [];
730 self.prevBufferLen = 0;
735 * Filters upgrades, returning only those matching client transports.
737 * @param {Array} server upgrades
742 Socket.prototype.filterUpgrades = function (upgrades) {
743 var filteredUpgrades = [];
744 for (var i = 0, j = upgrades.length; i < j; i++) {
745 if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
747 return filteredUpgrades;