}
+var IOWatcher = process.IOWatcher;
var assert = process.assert;
var socket = process.socket;
var bind = process.bind;
var EINPROGRESS = process.EINPROGRESS;
-function Stream (peerInfo) {
+function Socket (peerInfo) {
process.EventEmitter.call();
var self = this;
// Allocated on demand.
self.recvBuffer = null;
- self.readWatcher = new process.IOWatcher(function () {
+ self.readWatcher = new IOWatcher()
+ self.readWatcher.callback = function () {
// If this is the first recv (recvBuffer doesn't exist) or we've used up
// most of the recvBuffer, allocate a new one.
- if (!self.recvBuffer ||
+ if (!self.recvBuffer ||
self.recvBuffer.length - self.recvBuffer.used < 128) {
self._allocateNewRecvBuf();
}
self.readable = false;
self.readWatcher.stop();
self.emit('eof');
- if (!self.writable) self.forceClose();
+ if (!self.writable) self.forceClose();
} else {
var slice = self.recvBuffer.slice(self.recvBuffer.used,
self.recvBuffer.used + bytesRead);
self.recvBuffer.used += bytesRead;
self.emit('receive', slice);
}
- });
+ };
self.readable = false;
self.sendQueue = []; // queue of buffers that need to be written to socket
- // XXX use link list?
+ // XXX use link list?
self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length!
self._doFlush = function () {
assert(self.sendQueueSize > 0);
self.emit("drain");
}
};
- self.writeWatcher = new process.IOWatcher(self._doFlush);
+ self.writeWatcher = new IOWatcher();
+ self.writeWatcher.callback = self._doFlush;
self.writable = false;
if (peerInfo) {
self.writable = true;
}
};
-process.inherits(Stream, process.EventEmitter);
-exports.Stream = Stream;
+process.inherits(Socket, process.EventEmitter);
+exports.Socket = Socket;
exports.createConnection = function (port, host) {
- var s = new Stream();
+ var s = new Socket();
s.connect(port, host);
return s;
};
-Stream.prototype._allocateNewRecvBuf = function () {
+Socket.prototype._allocateNewRecvBuf = function () {
var self = this;
var newBufferSize = 1024; // TODO make this adjustable from user API
};
-Stream.prototype._allocateSendBuffer = function () {
+Socket.prototype._allocateSendBuffer = function () {
var b = new process.Buffer(1024);
b.used = 0;
b.sent = 0;
};
-Stream.prototype._sendString = function (data, encoding) {
+Socket.prototype._sendString = function (data, encoding) {
var self = this;
- if (!self.writable) throw new Error('Stream is not writable');
+ if (!self.writable) throw new Error('Socket is not writable');
var buffer;
if (self.sendQueue.length == 0) {
buffer = self._allocateSendBuffer();
// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket.
-Stream.prototype.send = function (data, encoding) {
+Socket.prototype.send = function (data, encoding) {
var self = this;
- if (!self.writable) throw new Error('Stream is not writable');
+ if (!self.writable) throw new Error('Socket is not writable');
if (typeof(data) == 'string') {
self._sendString(data, encoding);
} else {
// Flushes the write buffer out. Emits "drain" if the buffer is empty.
-Stream.prototype.flush = function () {
+Socket.prototype.flush = function () {
var self = this;
- if (!self.writable) throw new Error('Stream is not writable');
+ if (!self.writable) throw new Error('Socket is not writable');
var bytesWritten;
while (self.sendQueue.length > 0) {
};
-// var stream = new Stream();
+// var stream = new Socket();
// stream.connect(80) - TCP connect to port 80 on the localhost
// stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
// stream.connect('/tmp/socket') - UNIX connect to socket specified by path
-Stream.prototype.connect = function () {
+Socket.prototype.connect = function () {
var self = this;
- if (self.fd) throw new Error('Stream already opened');
+ if (self.fd) throw new Error('Socket already opened');
if (typeof(arguments[0]) == 'string' && arguments.length == 1) {
self.fd = process.socket('UNIX');
};
-Stream.prototype.forceClose = function (exception) {
+Socket.prototype.forceClose = function (exception) {
if (this.fd) {
this.readable = false;
this.writable = false;
close(this.fd);
debug('close peer ' + this.fd);
this.fd = null;
- this.emit('close', exception);
+ this.emit('close', exception);
}
};
-Stream.prototype._shutdown = function () {
+Socket.prototype._shutdown = function () {
if (this.writable) {
this.writable = false;
shutdown(this.fd, "write");
};
-Stream.prototype.close = function () {
+Socket.prototype.close = function () {
var self = this;
var closeMethod;
if (self.readable && self.writable) {
// already got EOF
closeMethod = self.forceClose;
}
- // In the case we've already shutdown write side,
+ // In the case we've already shutdown write side,
// but haven't got EOF: ignore. In the case we're
// fully closed already: ignore.
self.addListener('connection', listener);
}
- self.watcher = new process.IOWatcher(function (readable, writeable) {
+ self.watcher = new IOWatcher();
+ self.watcher.callback = function (readable, writeable) {
while (self.fd) {
var peerInfo = accept(self.fd);
debug('accept: ' + JSON.stringify(peerInfo));
if (!peerInfo) return;
- var peer = new Stream(peerInfo);
+ var peer = new Socket(peerInfo);
self.emit('connection', peer);
}
- });
-};
+ };
+}
process.inherits(Server, process.EventEmitter);
exports.Server = Server;
listen(self.fd, 128);
self.emit("listening");
- self.watcher.set(self.fd, true, false);
+ self.watcher.set(self.fd, true, false);
self.watcher.start();
};