var sys = require("./sys");
+var fs = require("./fs");
var debugLevel = 0;
if ('NODE_DEBUG' in process.ENV) debugLevel = 1;
function debug (x) {
}
FreeList.prototype.alloc = function () {
- debug("alloc " + this.name + " " + this.list.length);
+ //debug("alloc " + this.name + " " + this.list.length);
return this.list.length ? this.list.shift()
: this.constructor.apply(this, arguments);
}
FreeList.prototype.free = function (obj) {
- debug("free " + this.name + " " + this.list.length);
+ //debug("free " + this.name + " " + this.list.length);
if (this.list.length < this.max) {
this.list.push(obj);
}
allocRecvBuffer();
}
- debug('recvBuffer.used ' + recvBuffer.used);
+ //debug('recvBuffer.used ' + recvBuffer.used);
var bytesRead;
try {
recvBuffer,
recvBuffer.used,
recvBuffer.length - recvBuffer.used);
- debug('recvMsg.fd ' + recvMsg.fd);
+ //debug('recvMsg.fd ' + recvMsg.fd);
if (recvMsg.fd) {
self.emit('fd', recvMsg.fd);
}
return;
}
- debug('bytesRead ' + bytesRead + '\n');
+ //debug('bytesRead ' + bytesRead + '\n');
if (!recvMsg.fd && bytesRead == 0) {
self.readable = false;
} else if (bytesRead > 0) {
var start = recvBuffer.used;
var end = recvBuffer.used + bytesRead;
-
- if (self._events && self._events['data']) {
- // emit a slice
- self.emit('data', recvBuffer.slice(start, end));
+
+ if (!self._encoding) {
+ if (self._events && self._events['data']) {
+ // emit a slice
+ self.emit('data', recvBuffer.slice(start, end));
+ }
+
+ // Optimization: emit the original buffer with end points
+ if (self.ondata) self.ondata(recvBuffer, start, end);
+ } else {
+ // TODO remove me - we should only output Buffer
+
+ var string;
+ switch (self._encoding) {
+ case 'utf8':
+ string = recvBuffer.utf8Slice(start, end);
+ break;
+ case 'ascii':
+ string = recvBuffer.asciiSlice(start, end);
+ break;
+ default:
+ throw new Error('Unsupported encoding ' + self._encoding + '. Use Buffer');
+ }
+ self.emit('data', string);
}
- // Optimization: emit the original buffer with end points
- if (self.ondata) self.ondata(recvBuffer, start, end);
recvBuffer.used += bytesRead;
}
this.remoteAddress = peerInfo.remoteAddress;
this.remotePort = peerInfo.remotePort;
- this._readWatcher.set(this.fd, true, false);
- this._readWatcher.start();
+ this.resume();
this.readable = true;
this._writeWatcher.set(this.fd, false, true);
var self = this;
if (!self.writable) throw new Error('Socket is not writable');
var buffer;
+
if (self._writeQueue.length == 0) {
buffer = self._allocateSendBuffer();
} else {
bytesWritten = charsWritten;
} else if (encoding.toLowerCase() == 'utf8') {
buffer.isFd = false;
- charsWritten = buffer.utf8Write(data,
- buffer.used,
- buffer.length - buffer.used);
+ charsWritten = buffer.utf8Write(data, buffer.used);
bytesWritten = Buffer.utf8ByteLength(data.slice(0, charsWritten));
} else {
// ascii
buffer.isFd = false;
- charsWritten = buffer.asciiWrite(data,
- buffer.used,
- buffer.length - buffer.used);
+ charsWritten = buffer.asciiWrite(data, buffer.used);
bytesWritten = charsWritten;
}
self._writeQueueSize += bytesWritten;
}
- debug('charsWritten ' + charsWritten);
- debug('buffer.used ' + buffer.used);
+ //debug('charsWritten ' + charsWritten);
+ //debug('buffer.used ' + buffer.used);
// If we didn't finish, then recurse with the rest of the string.
if (charsWritten < data.length) {
- debug('recursive write');
+ //debug('recursive write');
self._writeString(data.slice(charsWritten), encoding);
}
};
throw new Error('send renamed to write');
};
+Socket.prototype.setEncoding = function (enc) {
+ // TODO check values, error out on bad, and deprecation message?
+ this._encoding = enc.toLowerCase();
+};
// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
if (b.isFd) {
b.sent = b.used;
self._writeMessageQueueSize -= 1;
- debug('sent fd: ' + fdToSend);
+ //debug('sent fd: ' + fdToSend);
} else {
b.sent += bytesWritten;
self._writeQueueSize -= bytesWritten;
- debug('bytes sent: ' + b.sent);
+ //debug('bytes sent: ' + b.sent);
}
}
if (self._writeWatcher) self._writeWatcher.stop();
};
+function doConnect (socket, port, host) {
+ try {
+ connect(socket.fd, port, host);
+ } catch (e) {
+ socket.forceClose(e);
+ }
+
+ // Don't start the read watcher until connection is established
+ socket._readWatcher.set(socket.fd, true, false);
+
+ // How to connect on POSIX: Wait for fd to become writable, then call
+ // socketError() if there isn't an error, we're connected. AFAIK this a
+ // platform independent way determining when a non-blocking connection
+ // is established, but I have only seen it documented in the Linux
+ // Manual Page connect(2) under the error code EINPROGRESS.
+ socket._writeWatcher.set(socket.fd, false, true);
+ socket._writeWatcher.start();
+ socket._writeWatcher.callback = function () {
+ var errno = socketError(socket.fd);
+ if (errno == 0) {
+ // connection established
+ socket._readWatcher.start();
+ socket.readable = true;
+ socket.writable = true;
+ socket._writeWatcher.callback = socket._doFlush;
+ socket.emit('connect');
+ } else if (errno != EINPROGRESS) {
+ socket.forceClose(errnoException(errno, 'connect'));
+ }
+ };
+}
+
+
// var stream = new Socket();
// stream.connect(80) - TCP connect to port 80 on the localhost
// stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
var self = this;
if (self.fd) throw new Error('Socket already opened');
- function doConnect () {
- try {
- connect(self.fd, arguments[0], arguments[1]);
- } catch (e) {
- close(self.fd);
- throw e;
- }
-
- // Don't start the read watcher until connection is established
- self._readWatcher.set(self.fd, true, false);
-
- // How to connect on POSIX: Wait for fd to become writable, then call
- // socketError() if there isn't an error, we're connected. AFAIK this a
- // platform independent way determining when a non-blocking connection
- // is established, but I have only seen it documented in the Linux
- // Manual Page connect(2) under the error code EINPROGRESS.
- self._writeWatcher.set(self.fd, false, true);
- self._writeWatcher.start();
- self._writeWatcher.callback = function () {
- var errno = socketError(self.fd);
- if (errno == 0) {
- // connection established
- self._readWatcher.start();
- self.readable = true;
- self.writable = true;
- self._writeWatcher.callback = self._doFlush;
- self.emit('connect');
- } else if (errno != EINPROGRESS) {
- self.forceClose(errnoException(errno, 'connect'));
- }
- };
- }
-
if (typeof(arguments[0]) == 'string') {
self.fd = socket('unix');
self.type = 'unix';
// TODO check if sockfile exists?
- doConnect(arguments[0]);
+ doConnect(self, arguments[0]);
} else {
self.fd = socket('tcp');
+ debug('new fd = ' + self.fd);
self.type = 'tcp';
// TODO dns resolution on arguments[1]
var port = arguments[0];
+ var yyy = xxx++;
lookupDomainName(arguments[1], function (ip) {
- doConnect(port, ip);
+ debug('doConnect ' + self.fd + ' yyy=' + yyy);
+ doConnect(self, port, ip);
+ debug('doConnect done ' + self.fd + ' yyy=' + yyy);
});
}
};
+var xxx = 0;
+
Socket.prototype.address = function () {
return getsockname(this.fd);
};
+Socket.prototype.pause = function () {
+ this._readWatcher.stop();
+};
+
+Socket.prototype.resume = function () {
+ if (!this.fd) throw new Error('Cannot resume() closed Socket.');
+ this._readWatcher.set(this.fd, true, false);
+ this._readWatcher.start();
+};
+
Socket.prototype.forceClose = function (exception) {
// recvBuffer is shared between sockets, so don't need to free it here.
+ var self = this;
var b;
while (this._writeQueue.length) {
if (this.fd) {
close(this.fd);
+ debug('close ' + this.fd);
this.fd = null;
- this.emit('close', exception);
+ process.nextTick(function () {
+ if (exception) self.emit('error', exception);
+ self.emit('close', exception ? true : false);
+ });
}
};
self.watcher.callback = function (readable, writeable) {
while (self.fd) {
var peerInfo = accept(self.fd);
- debug('accept: ' + JSON.stringify(peerInfo));
if (!peerInfo) return;
var peer = new Socket(peerInfo);
peer.type = self.type;
peer.server = self;
self.emit('connection', peer);
+ // The 'connect' event probably should be removed for server-side
+ // sockets. It's redundent.
+ peer.emit('connect');
}
};
}
*/
function lookupDomainName (dn, callback) {
if (!needsLookup(dn)) {
- callback(dn);
+ // Always wait until the next tick this is so people can do
+ //
+ // server.listen(8000);
+ // server.addListener('listening', fn);
+ //
+ // Marginally slower, but a lot fewer WTFs.
+ process.nextTick(function () { callback(dn); })
} else {
debug("getaddrinfo 4 " + dn);
getaddrinfo(dn, 4, function (r4) {
var path = arguments[0];
self.path = path;
// unlink sockfile if it exists
- process.fs.stat(path, function (r) {
- if (r instanceof Error) {
- if (r.errno == ENOENT) {
+ fs.stat(path, function (err, r) {
+ if (err) {
+ if (err.errno == ENOENT) {
bind(self.fd, path);
doListen();
} else {
if (!r.isFile()) {
throw new Error("Non-file exists at " + path);
} else {
- process.fs.unlink(path, function (err) {
+ fs.unlink(path, function (err) {
if (err) {
throw err;
} else {
self.fd = socket('tcp');
self.type = 'tcp';
var port = arguments[0];
- debug("starting tcp server on port " + port);
lookupDomainName(arguments[1], function (ip) {
- debug("starting tcp server on ip " + ip);
bind(self.fd, port, ip);
doListen();
});
self.fd = null;
if (self.type === "unix") {
- process.fs.unlink(self.path, function () {
+ fs.unlink(self.path, function () {
self.emit("close");
});
} else {