}
+var assert = process.assert;
var socket = process.socket;
var bind = process.bind;
var connect = process.connect;
var write = process.write;
var toRead = process.toRead;
var socketError = process.socketError;
+var getsockname = process.getsockname;
+var getaddrinfo = process.getaddrinfo;
+var needsLookup = process.needsLookup;
var EINPROGRESS = process.EINPROGRESS;
// Allocated on demand.
self.recvBuffer = null;
- self.sendQueue = [];
self.readWatcher = new process.IOWatcher(function () {
- debug('\n' + self.fd + ' readable');
-
// If this is the first recv (recvBuffer doesn't exist) or we've used up
// most of the recvBuffer, allocate a new one.
if (!self.recvBuffer ||
self.emit('receive', slice);
}
});
+ self.readable = false;
- self._onWriteFlush = function () {
- self.flush();
+ self.sendQueue = []; // queue of buffers that need to be written to socket
+ // XXX use link list?
+ self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length!
+ self._doFlush = function () {
+ assert(self.sendQueueSize > 0);
+ if (self.flush()) {
+ assert(self.sendQueueSize == 0);
+ self.emit("drain");
+ }
};
-
- self.writeWatcher = new process.IOWatcher(self._onWriteFlush);
-
- self.readable = false;
+ self.writeWatcher = new process.IOWatcher(self._doFlush);
self.writable = false;
if (peerInfo) {
self.readWatcher.set(self.fd, true, false);
self.readWatcher.start();
- self.writeWatcher.set(self.fd, false, true);
self.readable = true;
+
+ self.writeWatcher.set(self.fd, false, true);
self.writable = true;
}
};
exports.Stream = Stream;
+exports.createConnection = function (port, host) {
+ var s = new Stream();
+ s.connect(port, host);
+ return s;
+};
+
+
Stream.prototype._allocateNewRecvBuf = function () {
var self = this;
Stream.prototype._sendString = function (data, encoding) {
var self = this;
+ if (!self.writable) throw new Error('Stream is not writable');
var buffer;
if (self.sendQueue.length == 0) {
buffer = self._allocateSendBuffer();
encoding = encoding || 'ascii'; // default to ascii since it's faster
var charsWritten;
+ var bytesWritten;
if (encoding.toLowerCase() == 'utf8') {
charsWritten = buffer.utf8Write(data,
buffer.used,
buffer.length - buffer.used);
- buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten));
+ bytesWritten = process.Buffer.utf8Length(data.slice(0, charsWritten));
} else {
// ascii
charsWritten = buffer.asciiWrite(data,
buffer.used,
buffer.length - buffer.used);
- buffer.used += charsWritten;
- debug('ascii charsWritten ' + charsWritten);
- debug('ascii buffer.used ' + buffer.used);
+ bytesWritten = charsWritten;
}
+
+ buffer.used += bytesWritten;
+ self.sendQueueSize += bytesWritten;
+ debug('charsWritten ' + charsWritten);
+ debug('buffer.used ' + buffer.used);
// If we didn't finish, then recurse with the rest of the string.
if (charsWritten < data.length) {
};
+// Returns true if all the data was flushed to socket. Returns false if
+// something was queued. If data was queued, then the "drain" event will
+// signal when it has been finally flushed to socket.
Stream.prototype.send = function (data, encoding) {
var self = this;
+ if (!self.writable) throw new Error('Stream is not writable');
if (typeof(data) == 'string') {
self._sendString(data, encoding);
} else {
}
if (!inserted) self.sendQueue.push(data);
+
+ self.sendQueueSize += data.used;
}
- this.flush();
+ return this.flush();
};
-// returns true if flushed without getting EAGAIN
-// false if it got EAGAIN
+// Flushes the write buffer out. Emits "drain" if the buffer is empty.
Stream.prototype.flush = function () {
var self = this;
+ if (!self.writable) throw new Error('Stream is not writable');
+
var bytesWritten;
while (self.sendQueue.length > 0) {
var b = self.sendQueue[0];
b.sent,
b.used - b.sent);
if (bytesWritten === null) {
- this.writeWatcher.start();
+ // could not flush everything
+ self.writeWatcher.start();
+ assert(self.sendQueueSize > 0);
return false;
}
b.sent += bytesWritten;
+ self.sendQueueSize -= bytesWritten;
debug('bytes sent: ' + b.sent);
}
- this.writeWatcher.stop();
+ self.writeWatcher.stop();
return true;
};
var errno = socketError(self.fd);
if (errno == 0) {
// connection established
- self.emit('connect');
self.readWatcher.start();
self.readable = true;
self.writable = true;
- self.writeWatcher.callback = self._onWriteFlush;
+ self.writeWatcher.callback = self._doFlush;
+ self.emit('connect');
} else if (errno != EINPROGRESS) {
var e = new Error('connection error');
e.errno = errno;
- self.readWatcher.stop();
- self.writeWatcher.stop();
- close(self.fd);
+ self.forceClose(e);
}
};
};
};
-Stream.prototype.close = function () {
- if (this.readable && this.writable) {
+Stream.prototype._shutdown = function () {
+ if (this.writable) {
this.writable = false;
shutdown(this.fd, "write");
- } else if (!this.readable && this.writable) {
+ }
+};
+
+
+Stream.prototype.close = function () {
+ var self = this;
+ var closeMethod;
+ if (self.readable && self.writable) {
+ closeMethod = self._shutdown;
+ } else if (!self.readable && self.writable) {
// already got EOF
- this.forceClose(this.fd);
+ closeMethod = self.forceClose;
}
// In the case we've already shutdown write side,
// but haven't got EOF: ignore. In the case we're
// fully closed already: ignore.
+
+ if (closeMethod) {
+ if (self.sendQueueSize == 0) {
+ // no queue. just shut down the socket.
+ closeMethod();
+ } else {
+ self.addListener("drain", closeMethod);
+ }
+ }
};
exports.Server = Server;
+exports.createServer = function (listener) {
+ return new Server(listener);
+};
+
+
Server.prototype.listen = function () {
var self = this;
if (self.fd) throw new Error('Server already opened');
- var backlogIndex;
if (typeof(arguments[0]) == 'string' && arguments.length == 1) {
// the first argument specifies a path
self.fd = process.socket('UNIX');
+ self.type = 'UNIX';
// TODO unlink sockfile if exists?
// if (lstat(SOCKFILE, &tstat) == 0) {
// assert(S_ISSOCK(tstat.st_mode));
// unlink(SOCKFILE);
// }
bind(self.fd, arguments[0]);
- backlogIndex = 1;
+ } else if (arguments.length == 0) {
+ self.fd = process.socket('TCP');
+ self.type = 'TCP';
+ // Don't bind(). OS will assign a port with INADDR_ANY. The port will be
+ // passed to the 'listening' event.
} else {
// the first argument is the port, the second an IP
self.fd = process.socket('TCP');
+ self.type = 'TCP';
+ if (needsLookup(arguments[1])) {
+ getaddrinfo(arguments[1], function (ip) {
+ });
+ }
// TODO dns resolution on arguments[1]
bind(self.fd, arguments[0], arguments[1]);
- backlogIndex = typeof(arguments[1]) == 'string' ? 2 : 1;
}
- listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128);
+ listen(self.fd, 128);
self.emit("listening");
self.watcher.set(self.fd, true, false);
};
+Server.prototype.sockName = function () {
+ return getsockname(self.fd);
+};
+
+
Server.prototype.close = function () {
- var self = this;
- if (!self.fd) throw new Error('Not running');
- self.watcher.stop();
- close(self.fd);
- self.fd = null;
+ if (!this.fd) throw new Error('Not running');
+ this.watcher.stop();
+ close(this.fd);
+ this.fd = null;
+ this.emit("close");
};
#include <node_buffer.h>
#include <string.h>
+#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
static Persistent<String> fd_symbol;
static Persistent<String> remote_address_symbol;
static Persistent<String> remote_port_symbol;
+static Persistent<String> address_symbol;
+static Persistent<String> port_symbol;
#define FD_ARG(a) \
if (!(a)->IsInt32()) { \
char ipv6[255] = "::FFFF:";
if (inet_pton(AF_INET, *ip, &(in6.sin6_addr)) > 0) {
- // If this is an IPv4 address then we need to change it
- // to the IPv4-mapped-on-IPv6 format which looks like
+ // If this is an IPv4 address then we need to change it
+ // to the IPv4-mapped-on-IPv6 format which looks like
// ::FFFF:<IPv4 address>
- // For more information see "Address Format" ipv6(7) and
- // "BUGS" in inet_pton(3)
+ // For more information see "Address Format" ipv6(7) and
+ // "BUGS" in inet_pton(3)
strcat(ipv6, *ip);
} else {
strcpy(ipv6, *ip);
}
+static Handle<Value> GetSockName(const Arguments& args) {
+ HandleScope scope;
+
+ FD_ARG(args[0])
+
+ struct sockaddr_storage address_storage;
+ socklen_t len = sizeof(struct sockaddr_storage);
+
+ int r = getsockname(fd, (struct sockaddr *) &address_storage, &len);
+
+ if (r < 0) {
+ return ThrowException(ErrnoException(errno, "getsockname"));
+ }
+
+ Local<Object> info = Object::New();
+
+ if (address_storage.ss_family == AF_INET6) {
+ struct sockaddr_in6 *a = (struct sockaddr_in6*)&address_storage;
+
+ char ip[INET6_ADDRSTRLEN];
+ inet_ntop(AF_INET6, &(a->sin6_addr), ip, INET6_ADDRSTRLEN);
+
+ int port = ntohs(a->sin6_port);
+
+ info->Set(address_symbol, String::New(ip));
+ info->Set(port_symbol, Integer::New(port));
+ }
+
+ return scope.Close(info);
+}
+
+
static Handle<Value> Listen(const Arguments& args) {
HandleScope scope;
return ThrowException(ErrnoException(errno, "listen"));
}
+
return Undefined();
}
}
+// G E T A D D R I N F O
+
+struct resolve_request {
+ Persistent<Function> cb;
+ int ai_family; // AF_INET or AF_INET6
+ char hostname[1];
+};
+
+
+static int AfterResolve(eio_req *req) {
+ ev_unref(EV_DEFAULT_UC);
+
+ struct resolve_request * rreq = (struct resolve_request *)(req->data);
+
+ struct addrinfo *address = NULL,
+ *address_list = static_cast<struct addrinfo *>(req->ptr2);
+
+ HandleScope scope;
+ Local<Value> argv[1];
+
+ if (req->result != 0) {
+ argv[0] = ErrnoException(errno, "getaddrinfo");
+ } else {
+ int n = 0;
+ for (address = address_list; address; address = address->ai_next) { n++; }
+
+ Local<Array> results = Array::New(n);
+
+ char ip[INET6_ADDRSTRLEN];
+
+ n = 0;
+ address = address_list;
+ while (address) {
+ HandleScope scope;
+ assert(address->ai_family == AF_INET || address->ai_family == AF_INET6);
+ assert(address->ai_socktype == SOCK_STREAM);
+ const char *c = inet_ntop(address->ai_family, &(address->ai_addr), ip, INET6_ADDRSTRLEN);
+ Local<String> s = String::New(c);
+ results->Set(Integer::New(n), s);
+
+ n++;
+ address = address->ai_next;
+ }
+
+ argv[0] = results;
+ }
+
+ TryCatch try_catch;
+
+ rreq->cb->Call(Context::GetCurrent()->Global(), 1, argv);
+
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+
+ rreq->cb.Dispose(); // Dispose of the persistent handle
+ free(rreq);
+ freeaddrinfo(address_list);
+}
+
+static int Resolve(eio_req *req) {
+ // Note: this function is executed in the thread pool! CAREFUL
+ struct resolve_request * rreq = (struct resolve_request *) req->data;
+ struct addrinfo *address_list = NULL;
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = rreq->ai_family;
+ hints.ai_socktype = SOCK_STREAM;
+
+ req->result = getaddrinfo((char*)rreq->hostname, NULL, &hints, &address_list);
+ req->ptr2 = address_list;
+ return 0;
+}
+
+
+static Handle<Value> GetAddrInfo(const Arguments& args) {
+ HandleScope scope;
+
+ String::Utf8Value hostname(args[0]->ToString());
+
+ int type = args[1]->Int32Value();
+ int fam = AF_INET;
+ switch (type) {
+ case 4:
+ fam = AF_INET;
+ break;
+ case 6:
+ fam = AF_INET6;
+ break;
+ default:
+ return ThrowException(Exception::TypeError(
+ String::New("Second argument must be an integer 4 or 6")));
+ }
+
+ if (!args[2]->IsFunction()) {
+ return ThrowException(Exception::TypeError(
+ String::New("Thrid argument must be a callback")));
+ }
+
+ Local<Function> cb = Local<Function>::Cast(args[2]);
+
+ struct resolve_request *rreq = (struct resolve_request *)
+ malloc(sizeof(struct resolve_request) + hostname.length());
+
+ if (!rreq) {
+ V8::LowMemoryNotification();
+ return ThrowException(Exception::Error(
+ String::New("Could not allocate enough memory")));
+ }
+
+ strcpy(rreq->hostname, *hostname);
+ rreq->cb = Persistent<Function>::New(cb);
+ rreq->ai_family = fam;
+
+ // For the moment I will do DNS lookups in the eio thread pool. This is
+ // sub-optimal and cannot handle massive numbers of requests.
+ //
+ // (One particularly annoying problem is that the pthread stack size needs
+ // to be increased dramatically to handle getaddrinfo() see X_STACKSIZE in
+ // wscript ).
+ //
+ // In the future I will move to a system using c-ares:
+ // http://lists.schmorp.de/pipermail/libev/2009q1/000632.html
+ eio_custom(Resolve, EIO_PRI_DEFAULT, AfterResolve, rreq);
+
+ // There will not be any active watchers from this object on the event
+ // loop while getaddrinfo() runs. If the only thing happening in the
+ // script was this hostname resolution, then the event loop would drop
+ // out. Thus we need to add ev_ref() until AfterResolve().
+ ev_ref(EV_DEFAULT_UC);
+
+ return Undefined();
+}
+
+
+static Handle<Value> NeedsLookup(const Arguments& args) {
+ HandleScope scope;
+
+ if (args[0]->IsNull() || args[0]->IsUndefined()) return False();
+
+ String::Utf8Value s(args[0]->ToString());
+
+ // avoiding buffer overflows in the following strcat
+ // 2001:0db8:85a3:08d3:1319:8a2e:0370:7334
+ // 39 = max ipv6 address.
+ if (s.length() > INET6_ADDRSTRLEN) return True();
+
+ struct sockaddr_in6 a;
+
+ if (inet_pton(AF_INET, *s, &(a.sin6_addr)) > 0) return False();
+ if (inet_pton(AF_INET6, *s, &(a.sin6_addr)) > 0) return False();
+
+ char ipv6[255] = "::FFFF:";
+ strcat(ipv6, *s);
+ if (inet_pton(AF_INET6, ipv6, &(a.sin6_addr)) > 0) return False();
+
+ return True();
+}
+
+
void InitNet2(Handle<Object> target) {
HandleScope scope;
NODE_SET_METHOD(target, "accept", Accept);
NODE_SET_METHOD(target, "socketError", SocketError);
NODE_SET_METHOD(target, "toRead", ToRead);
-
+ NODE_SET_METHOD(target, "getsocksame", GetSockName);
+ NODE_SET_METHOD(target, "getaddrinfo", GetAddrInfo);
+ NODE_SET_METHOD(target, "needsLookup", NeedsLookup);
target->Set(String::NewSymbol("EINPROGRESS"), Integer::New(EINPROGRESS));
target->Set(String::NewSymbol("EINTR"), Integer::New(EINTR));
target->Set(String::NewSymbol("EPERM"), Integer::New(EPERM));
target->Set(String::NewSymbol("EADDRINUSE"), Integer::New(EADDRINUSE));
target->Set(String::NewSymbol("ECONNREFUSED"), Integer::New(ECONNREFUSED));
-
+
errno_symbol = NODE_PSYMBOL("errno");
syscall_symbol = NODE_PSYMBOL("syscall");
fd_symbol = NODE_PSYMBOL("fd");
remote_address_symbol = NODE_PSYMBOL("remoteAddress");
remote_port_symbol = NODE_PSYMBOL("remotePort");
+ address_symbol = NODE_PSYMBOL("address");
+ port_symbol = NODE_PSYMBOL("port");
}
} // namespace node
stream.send("pong utf8\r\n", "utf8");
});
+ stream.addListener('drain', function () {
+ sys.puts("server-side socket drain");
+ });
+
stream.addListener("eof", function () {
sys.puts("server peer eof");
stream.close();
sys.puts("server fd: " + server.fd);
-var stream = new net.Stream();
-stream.addListener('connect', function () {
+var c = net.createConnection(8000);
+c.addListener('connect', function () {
sys.puts("!!!client connected");
- stream.send("hello\n");
+ c.send("hello\n");
});
-stream.addListener('receive', function (d) {
- sys.puts("!!!client got: " + JSON.stringify(d.toString()));
+c.addListener('drain', function () {
+ sys.puts("!!!client drain");
});
-stream.connect(8000);
+c.addListener('receive', function (d) {
+ sys.puts("!!!client got: " + JSON.stringify(d.toString()));
+});
conf.define("HAVE_CONFIG_H", 1)
- conf.env.append_value("CCFLAGS", "-DX_STACKSIZE=%d" % (1024*64))
+ conf.env.append_value("CCFLAGS", "-DX_STACKSIZE=%d" % (2*1024*1024))
# LFS
conf.env.append_value('CCFLAGS', '-D_LARGEFILE_SOURCE')