// USE OR OTHER DEALINGS IN THE SOFTWARE.
var events = require('events');
-var Stream = require('stream');
+var stream = require('stream');
var timers = require('timers');
var util = require('util');
var assert = require('assert');
}
-/* Bit flags for socket._flags */
-var FLAG_GOT_EOF = 1 << 0;
-var FLAG_SHUTDOWN = 1 << 1;
-var FLAG_DESTROY_SOON = 1 << 2;
-var FLAG_SHUTDOWN_QUEUED = 1 << 3;
-
-
var debug;
if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) {
- debug = function(x) { console.error('NET:', x); };
+ var pid = process.pid;
+ debug = function(x) {
+ // if console is not set up yet, then skip this.
+ if (!console.error)
+ return;
+ console.error('NET: %d', pid,
+ util.format.apply(util, arguments).slice(0, 500));
+ };
} else {
debug = function() { };
}
exports._normalizeConnectArgs = normalizeConnectArgs;
-/* called when creating new Socket, or when re-using a closed Socket */
+// called when creating new Socket, or when re-using a closed Socket
function initSocketHandle(self) {
- self._pendingWriteReqs = 0;
-
- self._flags = 0;
- self._connectQueueSize = 0;
self.destroyed = false;
self.errorEmitted = false;
self.bytesRead = 0;
function Socket(options) {
if (!(this instanceof Socket)) return new Socket(options);
- Stream.call(this);
-
switch (typeof options) {
case 'number':
options = { fd: options }; // Legacy interface.
break;
}
- if (typeof options.fd === 'undefined') {
+ this.readable = this.writable = false;
+ if (options.handle) {
+ this._handle = options.handle; // private
+ } else if (typeof options.fd === 'undefined') {
this._handle = options && options.handle; // private
} else {
this._handle = createPipe();
this.readable = this.writable = true;
}
+ this.onend = null;
+
+ // shut down the socket when we're finished with it.
+ this.on('finish', onSocketFinish);
+ this.on('_socketEnd', onSocketEnd);
+
initSocketHandle(this);
- this.allowHalfOpen = options && options.allowHalfOpen;
+
+ this._pendingWrite = null;
+
+ stream.Duplex.call(this, options);
+
+ // handle strings directly
+ this._writableState.decodeStrings = false;
+
+ // default to *not* allowing half open sockets
+ this.allowHalfOpen = options && options.allowHalfOpen || false;
+
+ // if we have a handle, then start the flow of data into the
+ // buffer. if not, then this will happen when we connect
+ if (this._handle && (!options || options.readable !== false))
+ this.read(0);
+}
+util.inherits(Socket, stream.Duplex);
+
+// the user has called .end(), and all the bytes have been
+// sent out to the other side.
+// If allowHalfOpen is false, or if the readable side has
+// ended already, then destroy.
+// If allowHalfOpen is true, then we need to do a shutdown,
+// so that only the writable side will be cleaned up.
+function onSocketFinish() {
+ debug('onSocketFinish');
+ if (this._readableState.ended) {
+ debug('oSF: ended, destroy', this._readableState);
+ return this.destroy();
+ }
+
+ debug('oSF: not ended, call shutdown()');
+
+ // otherwise, just shutdown, or destroy() if not possible
+ if (!this._handle.shutdown)
+ return this.destroy();
+
+ var shutdownReq = this._handle.shutdown();
+
+ if (!shutdownReq)
+ return this._destroy(errnoException(errno, 'shutdown'));
+
+ shutdownReq.oncomplete = afterShutdown;
+}
+
+
+function afterShutdown(status, handle, req) {
+ var self = handle.owner;
+
+ debug('afterShutdown destroyed=%j', self.destroyed,
+ self._readableState);
+
+ // callback may come after call to destroy.
+ if (self.destroyed)
+ return;
+
+ if (self._readableState.ended) {
+ debug('readableState ended, destroying');
+ self.destroy();
+ } else {
+ self.once('_socketEnd', self.destroy);
+ }
}
-util.inherits(Socket, Stream);
+// the EOF has been received, and no more bytes are coming.
+// if the writable side has ended already, then clean everything
+// up.
+function onSocketEnd() {
+ // XXX Should not have to do as much crap in this function.
+ // ended should already be true, since this is called *after*
+ // the EOF errno and onread has returned null to the _read cb.
+ debug('onSocketEnd', this._readableState);
+ this._readableState.ended = true;
+ if (this._readableState.endEmitted) {
+ this.readable = false;
+ } else {
+ this.once('end', function() {
+ this.readable = false;
+ });
+ this.read(0);
+ }
+
+ if (!this.allowHalfOpen)
+ this.destroySoon();
+}
exports.Socket = Socket;
exports.Stream = Socket; // Legacy naming.
Socket.prototype.listen = function() {
+ debug('socket.listen');
var self = this;
self.on('connection', arguments[0]);
listen(self, null, null, null);
Object.defineProperty(Socket.prototype, 'bufferSize', {
get: function() {
if (this._handle) {
- return this._handle.writeQueueSize + this._connectQueueSize;
+ return this._handle.writeQueueSize;
}
}
});
-Socket.prototype.pause = function() {
- this._paused = true;
- if (this._handle && !this._connecting) {
- this._handle.readStop();
+// Just call handle.readStart until we have enough in the buffer
+Socket.prototype._read = function(n, callback) {
+ debug('_read');
+ if (this._connecting || !this._handle) {
+ debug('_read wait for connection');
+ this.once('connect', this._read.bind(this, n, callback));
+ return;
}
-};
+ assert(callback === this._readableState.onread);
+ assert(this._readableState.reading = true);
-Socket.prototype.resume = function() {
- this._paused = false;
- if (this._handle && !this._connecting) {
- this._handle.readStart();
+ if (!this._handle.reading) {
+ debug('Socket._read readStart');
+ this._handle.reading = true;
+ var r = this._handle.readStart();
+ if (r)
+ this._destroy(errnoException(errno, 'read'));
+ } else {
+ debug('readStart already has been called.');
}
};
Socket.prototype.end = function(data, encoding) {
- if (this._connecting && ((this._flags & FLAG_SHUTDOWN_QUEUED) == 0)) {
- // still connecting, add data to buffer
- if (data) this.write(data, encoding);
- this.writable = false;
- this._flags |= FLAG_SHUTDOWN_QUEUED;
- }
-
- if (!this.writable) return;
+ stream.Duplex.prototype.end.call(this, data, encoding);
this.writable = false;
-
- if (data) this.write(data, encoding);
DTRACE_NET_STREAM_END(this);
- if (!this.readable) {
- this.destroySoon();
- } else {
- this._flags |= FLAG_SHUTDOWN;
- var shutdownReq = this._handle.shutdown();
-
- if (!shutdownReq) {
- this._destroy(errnoException(errno, 'shutdown'));
- return false;
- }
-
- shutdownReq.oncomplete = afterShutdown;
- }
-
- return true;
+ // just in case we're waiting for an EOF.
+ if (!this._readableState.endEmitted)
+ this.read(0);
+ return;
};
-function afterShutdown(status, handle, req) {
- var self = handle.owner;
-
- assert.ok(self._flags & FLAG_SHUTDOWN);
- assert.ok(!self.writable);
-
- // callback may come after call to destroy.
- if (self.destroyed) {
- return;
- }
-
- if (self._flags & FLAG_GOT_EOF || !self.readable) {
- self._destroy();
- } else {
- }
-}
-
-
Socket.prototype.destroySoon = function() {
- this.writable = false;
- this._flags |= FLAG_DESTROY_SOON;
-
- if (this._pendingWriteReqs == 0) {
- this._destroy();
- }
-};
-
+ if (this.writable)
+ this.end();
-Socket.prototype._connectQueueCleanUp = function(exception) {
- this._connecting = false;
- this._connectQueueSize = 0;
- this._connectQueue = null;
+ if (this._writableState.finishing || this._writableState.finished)
+ this.destroy();
+ else
+ this.once('finish', this.destroy);
};
Socket.prototype._destroy = function(exception, cb) {
+ debug('destroy');
+
var self = this;
function fireErrorCallbacks() {
};
if (this.destroyed) {
+ debug('already destroyed, fire error callbacks');
fireErrorCallbacks();
return;
}
- self._connectQueueCleanUp();
-
- debug('destroy');
+ self._connecting = false;
this.readable = this.writable = false;
debug('close');
if (this._handle) {
+ if (this !== process.stderr)
+ debug('close handle');
this._handle.close();
this._handle.onread = noop;
this._handle = null;
fireErrorCallbacks();
process.nextTick(function() {
+ debug('emit close');
self.emit('close', exception ? true : false);
});
if (this.server) {
COUNTER_NET_SERVER_CONNECTION_CLOSE(this);
+ debug('has server');
this.server._connections--;
if (this.server._emitCloseIfDrained) {
this.server._emitCloseIfDrained();
Socket.prototype.destroy = function(exception) {
+ debug('destroy', exception);
this._destroy(exception);
};
+// This function is called whenever the handle gets a
+// buffer, or when there's an error reading.
function onread(buffer, offset, length) {
var handle = this;
var self = handle.owner;
timers.active(self);
var end = offset + length;
+ debug('onread', global.errno, offset, length, end);
if (buffer) {
- // Emit 'data' event.
+ debug('got data');
- if (self._decoder) {
- // Emit a string.
- var string = self._decoder.write(buffer.slice(offset, end));
- if (string.length) self.emit('data', string);
- } else {
- // Emit a slice. Attempt to avoid slicing the buffer if no one is
- // listening for 'data'.
- if (self._events && self._events['data']) {
- self.emit('data', buffer.slice(offset, end));
- }
+ // read success.
+ // In theory (and in practice) calling readStop right now
+ // will prevent this from being called again until _read() gets
+ // called again.
+
+ // if we didn't get any bytes, that doesn't necessarily mean EOF.
+ // wait for the next one.
+ if (offset === end) {
+ debug('not any data, keep waiting');
+ return;
}
+ // if it's not enough data, we'll just call handle.readStart()
+ // again right away.
self.bytesRead += length;
+ self._readableState.onread(null, buffer.slice(offset, end));
+
+ if (handle.reading && !self._readableState.reading) {
+ handle.reading = false;
+ debug('readStop');
+ var r = handle.readStop();
+ if (r)
+ self._destroy(errnoException(errno, 'read'));
+ }
// Optimization: emit the original buffer with end points
if (self.ondata) self.ondata(buffer, offset, end);
} else if (errno == 'EOF') {
- // EOF
- self.readable = false;
+ debug('EOF');
- assert.ok(!(self._flags & FLAG_GOT_EOF));
- self._flags |= FLAG_GOT_EOF;
+ if (self._readableState.length === 0)
+ self.readable = false;
- // We call destroy() before end(). 'close' not emitted until nextTick so
- // the 'end' event will come first as required.
- if (!self.writable) self._destroy();
+ if (self.onend) self.once('end', self.onend);
- if (!self.allowHalfOpen) self.end();
- if (self._decoder) {
- var ret = self._decoder.end();
- if (ret)
- self.emit('data', ret);
- }
- if (self._events && self._events['end']) self.emit('end');
- if (self.onend) self.onend();
+ // send a null to the _read cb to signal the end of data.
+ self._readableState.onread(null, null);
+
+ // internal end event so that we know that the actual socket
+ // is no longer readable, and we can start the shutdown
+ // procedure. No need to wait for all the data to be consumed.
+ self.emit('_socketEnd');
} else {
+ debug('error', errno);
// Error
if (errno == 'ECONNRESET') {
self._destroy();
}
-Socket.prototype.setEncoding = function(encoding) {
- var StringDecoder = require('string_decoder').StringDecoder; // lazy load
- this._decoder = new StringDecoder(encoding);
-};
-
-
Socket.prototype._getpeername = function() {
if (!this._handle || !this._handle.getpeername) {
return {};
});
-/*
- * Arguments data, [encoding], [cb]
- */
-Socket.prototype.write = function(data, arg1, arg2) {
- var encoding, cb;
+Socket.prototype.write = function(chunk, encoding, cb) {
+ if (typeof chunk !== 'string' && !Buffer.isBuffer(chunk))
+ throw new TypeError('invalid data');
+ return stream.Duplex.prototype.write.apply(this, arguments);
+};
- // parse arguments
- if (arg1) {
- if (typeof arg1 === 'string') {
- encoding = arg1;
- cb = arg2;
- } else if (typeof arg1 === 'function') {
- cb = arg1;
- } else {
- throw new Error('bad arg');
- }
- }
- if (typeof data === 'string') {
- encoding = (encoding || 'utf8').toLowerCase();
- switch (encoding) {
- case 'utf8':
- case 'utf-8':
- case 'ascii':
- case 'ucs2':
- case 'ucs-2':
- case 'utf16le':
- case 'utf-16le':
- // This encoding can be handled in the binding layer.
- break;
+Socket.prototype._write = function(dataEncoding, cb) {
+ assert(Array.isArray(dataEncoding));
+ var data = dataEncoding[0];
+ var encoding = dataEncoding[1] || 'utf8';
- default:
- data = new Buffer(data, encoding);
- }
- } else if (!Buffer.isBuffer(data)) {
- throw new TypeError('First argument must be a buffer or a string.');
- }
+ if (this !== process.stderr && this !== process.stdout)
+ debug('Socket._write');
// If we are still connecting, then buffer this for later.
+ // The Writable logic will buffer up any more writes while
+ // waiting for this one to be done.
if (this._connecting) {
- this._connectQueueSize += data.length;
- if (this._connectQueue) {
- this._connectQueue.push([data, encoding, cb]);
- } else {
- this._connectQueue = [[data, encoding, cb]];
- }
- return false;
+ debug('_write: waiting for connection');
+ this._pendingWrite = dataEncoding;
+ this.once('connect', function() {
+ debug('_write: connected now, try again');
+ this._write(dataEncoding, cb);
+ });
+ return;
}
+ this._pendingWrite = null;
- return this._write(data, encoding, cb);
-};
-
-
-Socket.prototype._write = function(data, encoding, cb) {
timers.active(this);
if (!this._handle) {
+ debug('already destroyed');
this._destroy(new Error('This socket is closed.'), cb);
return false;
}
break;
default:
- assert(0);
+ writeReq = this._handle.writeBuffer(new Buffer(data, encoding));
+ break;
}
}
- if (!writeReq || typeof writeReq !== 'object') {
- this._destroy(errnoException(errno, 'write'), cb);
- return false;
- }
+ if (!writeReq || typeof writeReq !== 'object')
+ return this._destroy(errnoException(errno, 'write'), cb);
writeReq.oncomplete = afterWrite;
writeReq.cb = cb;
- this._pendingWriteReqs++;
this._bytesDispatched += writeReq.bytes;
-
- return this._handle.writeQueueSize == 0;
};
Socket.prototype.__defineGetter__('bytesWritten', function() {
var bytes = this._bytesDispatched,
- connectQueue = this._connectQueue;
+ state = this._writableState,
+ pending = this._pendingWrite;
- if (connectQueue) {
- connectQueue.forEach(function(el) {
- var data = el[0];
- if (Buffer.isBuffer(data)) {
- bytes += data.length;
- } else {
- bytes += Buffer.byteLength(data, el[1]);
- }
- }, this);
- }
+ state.buffer.forEach(function(el) {
+ bytes += Buffer.byteLength(el[0], el[1]);
+ });
+
+ if (pending)
+ bytes += Buffer.byteLength(pending[0], pending[1]);
return bytes;
});
function afterWrite(status, handle, req) {
var self = handle.owner;
+ var state = self._writableState;
+ if (self !== process.stderr && self !== process.stdout)
+ debug('afterWrite', status, req);
// callback may come after call to destroy.
if (self.destroyed) {
+ debug('afterWrite destroyed');
return;
}
if (status) {
+ debug('write failure', errnoException(errno, 'write'));
self._destroy(errnoException(errno, 'write'), req.cb);
return;
}
timers.active(self);
- self._pendingWriteReqs--;
-
- if (self._pendingWriteReqs == 0) {
- self.emit('drain');
- }
+ if (self !== process.stderr && self !== process.stdout)
+ debug('afterWrite call cb');
- if (req.cb) req.cb();
-
- if (self._pendingWriteReqs == 0 && self._flags & FLAG_DESTROY_SOON) {
- self._destroy();
- }
+ req.cb.call(self);
}
return Socket.prototype.connect.apply(this, args);
}
+ if (this.destroyed) {
+ this._readableState.reading = false;
+ this._readableState.ended = false;
+ this._writableState.ended = false;
+ this._writableState.ending = false;
+ this._writableState.finished = false;
+ this._writableState.finishing = false;
+ this.destroyed = false;
+ this._handle = null;
+ }
+
var self = this;
var pipe = !!options.path;
- if (this.destroyed || !this._handle) {
+ if (!this._handle) {
this._handle = pipe ? createPipe() : createTCP();
initSocketHandle(this);
}
self.writable = writable;
timers.active(self);
- if (self.readable && !self._paused) {
- handle.readStart();
- }
-
- if (self._connectQueue) {
- debug('Drain the connect queue');
- var connectQueue = self._connectQueue;
- for (var i = 0; i < connectQueue.length; i++) {
- self._write.apply(self, connectQueue[i]);
- }
- self._connectQueueCleanUp();
- }
-
self.emit('connect');
- if (self._flags & FLAG_SHUTDOWN_QUEUED) {
- // end called before connected - call end now with no data
- self._flags &= ~FLAG_SHUTDOWN_QUEUED;
- self.end();
- }
+ // start the first read, or get an immediate EOF.
+ // this doesn't actually consume any bytes, because len=0.
+ if (readable)
+ self.read(0);
+
} else {
- self._connectQueueCleanUp();
+ self._connecting = false;
self._destroy(errnoException(errno, 'connect'));
}
}
configurable: true, enumerable: true
});
- this.allowHalfOpen = options.allowHalfOpen || false;
-
this._handle = null;
+
+ this.allowHalfOpen = options.allowHalfOpen || false;
}
util.inherits(Server, events.EventEmitter);
exports.Server = Server;
Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
+ debug('listen2', address, port, addressType, backlog);
var self = this;
var r = 0;
// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if (!self._handle) {
+ debug('_listen2: create a handle');
self._handle = createServerHandle(address, port, addressType, fd);
if (!self._handle) {
var error = errnoException(errno, 'listen');
});
return;
}
+ } else {
+ debug('_listen2: have a handle already');
}
self._handle.onconnection = onconnection;
});
socket.readable = socket.writable = true;
- clientHandle.readStart();
self._connections++;
socket.server = self;
};
Server.prototype._emitCloseIfDrained = function() {
+ debug('SERVER _emitCloseIfDrained');
var self = this;
- if (self._handle || self._connections) return;
+ if (self._handle || self._connections) {
+ debug('SERVER handle? %j connections? %d',
+ !!self._handle, self._connections);
+ return;
+ }
process.nextTick(function() {
+ debug('SERVER: emit close');
self.emit('close');
});
};