From: Ryan Dahl Date: Tue, 15 Dec 2009 08:22:36 +0000 (+0100) Subject: More bindings, beginning tcp server code in js X-Git-Tag: v0.1.92~84^2~120 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=469e2648e59c19154fd67ae1df17487f673fe9fc;p=platform%2Fupstream%2Fnodejs.git More bindings, beginning tcp server code in js --- diff --git a/src/node_buffer.cc b/src/node_buffer.cc index 47ddf7b..be1020d 100644 --- a/src/node_buffer.cc +++ b/src/node_buffer.cc @@ -1,14 +1,15 @@ +#include + #include #include // malloc, free #include + #include namespace node { using namespace v8; -#define MIN(a,b) ((a) < (b) ? (a) : (b)) - #define SLICE_ARGS(start_arg, end_arg) \ if (!start_arg->IsInt32() || !end_arg->IsInt32()) { \ return ThrowException(Exception::TypeError( \ @@ -24,33 +25,11 @@ using namespace v8; static Persistent length_symbol; static Persistent constructor_template; -/* A buffer is a chunk of memory stored outside the V8 heap, mirrored by an - * object in javascript. The object is not totally opaque, one can access - * individual bytes with [] and slice it into substrings or sub-buffers - * without copying memory. - * - * // return an ascii encoded string - no memory iscopied - * buffer.asciiSlide(0, 3) - * - * // returns another buffer - no memory is copied - * buffer.slice(0, 3) - * - * Interally, each javascript buffer object is backed by a "struct buffer" - * object. These "struct buffer" objects are either a root buffer (in the - * case that buffer->root == NULL) or slice objects (in which case - * buffer->root != NULL). A root buffer is only GCed once all its slices - * are GCed. - */ - -struct buffer { - Persistent handle; // both - bool weak; // both - struct buffer *root; // both (NULL for root) - size_t offset; // both (0 for root) - size_t length; // both - unsigned int refs; // root only - char bytes[1]; // root only -}; +bool IsBuffer(v8::Handle val) { + if (!val->IsObject()) return false; + Local obj = val->ToObject(); + return constructor_template->HasInstance(obj); +} static inline struct buffer* buffer_root(buffer *buffer) { @@ -79,7 +58,7 @@ static inline void buffer_unref(struct buffer *buffer) { } -static inline struct buffer* Unwrap(Handle val) { +struct buffer* BufferUnwrap(v8::Handle val) { assert(val->IsObject()); HandleScope scope; Local obj = val->ToObject(); @@ -123,7 +102,7 @@ static Handle Constructor(const Arguments &args) { // slice slice SLICE_ARGS(args[1], args[2]) - struct buffer *parent = Unwrap(args[0]); + struct buffer *parent = BufferUnwrap(args[0]); size_t start_abs = buffer_abs_off(parent, start); size_t end_abs = buffer_abs_off(parent, end); @@ -230,7 +209,7 @@ static Handle AsciiSlice(const Arguments &args) { SLICE_ARGS(args[0], args[1]) assert(args.This()->InternalFieldCount() == 1); - struct buffer *parent = Unwrap(args.This()); + struct buffer *parent = BufferUnwrap(args.This()); size_t start_abs = buffer_abs_off(parent, start); size_t end_abs = buffer_abs_off(parent, end); @@ -251,7 +230,7 @@ static Handle Utf8Slice(const Arguments &args) { SLICE_ARGS(args[0], args[1]) - struct buffer *parent = Unwrap(args.This()); + struct buffer *parent = BufferUnwrap(args.This()); size_t start_abs = buffer_abs_off(parent, start); size_t end_abs = buffer_abs_off(parent, end); assert(start_abs <= end_abs); diff --git a/src/node_buffer.h b/src/node_buffer.h index f700642..3afb3ee 100644 --- a/src/node_buffer.h +++ b/src/node_buffer.h @@ -5,8 +5,53 @@ namespace node { +#define MIN(a,b) ((a) < (b) ? (a) : (b)) + +/* A buffer is a chunk of memory stored outside the V8 heap, mirrored by an + * object in javascript. The object is not totally opaque, one can access + * individual bytes with [] and slice it into substrings or sub-buffers + * without copying memory. + * + * // return an ascii encoded string - no memory iscopied + * buffer.asciiSlide(0, 3) + * + * // returns another buffer - no memory is copied + * buffer.slice(0, 3) + * + * Interally, each javascript buffer object is backed by a "struct buffer" + * object. These "struct buffer" objects are either a root buffer (in the + * case that buffer->root == NULL) or slice objects (in which case + * buffer->root != NULL). A root buffer is only GCed once all its slices + * are GCed. + */ + +struct buffer { + v8::Persistent handle; // both + bool weak; // both + struct buffer *root; // both (NULL for root) + size_t offset; // both (0 for root) + size_t length; // both + unsigned int refs; // root only + char bytes[1]; // root only +}; + void InitBuffer(v8::Handle target); +struct buffer* BufferUnwrap(v8::Handle val); +bool IsBuffer(v8::Handle val); + +static inline char * buffer_p(struct buffer *buffer, size_t off) { + struct buffer *root = buffer->root ? buffer->root : buffer; + if (buffer->offset + off >= root->length) return NULL; + return reinterpret_cast(&(root->bytes) + buffer->offset + off); +} + +static inline size_t buffer_remaining(struct buffer *buffer, size_t off) { + struct buffer *root = buffer->root ? buffer->root : buffer; + char *end = reinterpret_cast(&(root->bytes) + root->length); + return end - buffer_p(buffer, off); +} + } #endif // NODE_BUFFER diff --git a/src/node_net2.cc b/src/node_net2.cc index 6ced4a2..13a5aaf 100644 --- a/src/node_net2.cc +++ b/src/node_net2.cc @@ -2,6 +2,7 @@ #include #include +#include #include @@ -14,8 +15,6 @@ #include - - namespace node { using namespace v8; @@ -23,7 +22,20 @@ using namespace v8; static Persistent errno_symbol; static Persistent syscall_symbol; -static inline Local ErrnoException(int errorno, const char *syscall, const char *msg = "") { +static Persistent fd_symbol; +static Persistent remote_address_symbol; +static Persistent remote_port_symbol; + +#define FD_ARG(a) \ + if (!(a)->IsInt32()) { \ + return ThrowException(Exception::TypeError( \ + String::New("Bad file descriptor argument"))); \ + } \ + int fd = (a)->Int32Value(); + +static inline Local ErrnoException(int errorno, + const char *syscall, + const char *msg = "") { if (!msg[0]) msg = strerror(errorno); Local e = Exception::Error(String::NewSymbol(msg)); Local obj = e->ToObject(); @@ -32,6 +44,57 @@ static inline Local ErrnoException(int errorno, const char *syscall, cons return e; } +static inline bool SetNonBlock(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) return false; + flags |= O_NONBLOCK; + return (fcntl(fd, F_SETFL, flags) != -1); +} + +// Creates nonblocking pipe +static Handle Pipe(const Arguments& args) { + HandleScope scope; + int fds[2]; + + if (pipe(fds) < 0) return ThrowException(ErrnoException(errno, "pipe")); + + if(!SetNonBlock(fds[0]) || !SetNonBlock(fds[1])) { + int fcntl_errno = errno; + close(fds[0]); + close(fds[1]); + return ThrowException(ErrnoException(fcntl_errno, "fcntl")); + } + + Local a = Array::New(2); + a->Set(Integer::New(0), Integer::New(fds[0])); + a->Set(Integer::New(1), Integer::New(fds[1])); + return scope.Close(a); +} + +// Creates nonblocking socket pair +static Handle SocketPair(const Arguments& args) { + HandleScope scope; + + int fds[2]; + + // XXX support SOCK_DGRAM? + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { + return ThrowException(ErrnoException(errno, "socketpair")); + } + + if (!SetNonBlock(fds[0]) || !SetNonBlock(fds[1])) { + int fcntl_errno = errno; + close(fds[0]); + close(fds[1]); + return ThrowException(ErrnoException(fcntl_errno, "fcntl")); + } + + Local a = Array::New(2); + a->Set(Integer::New(0), Integer::New(fds[0])); + a->Set(Integer::New(1), Integer::New(fds[1])); + return scope.Close(a); +} + // Creates a new non-blocking socket fd // t.socket("TCP"); // t.socket("UNIX"); @@ -64,19 +127,8 @@ static Handle Socket(const Arguments& args) { if (fd < 0) return ThrowException(ErrnoException(errno, "socket")); - int fcntl_errno; - - int flags = fcntl(fd, F_GETFL, 0); - if (flags == -1) { - fcntl_errno = errno; - close(fd); - return ThrowException(ErrnoException(fcntl_errno, "fcntl")); - } - - flags |= O_NONBLOCK; - - if (fcntl(fd, F_SETFL, flags) == -1) { - fcntl_errno = errno; + if (!SetNonBlock(fd)) { + int fcntl_errno = errno; close(fd); return ThrowException(ErrnoException(fcntl_errno, "fcntl")); } @@ -84,39 +136,26 @@ static Handle Socket(const Arguments& args) { return scope.Close(Integer::New(fd)); } -// 2 arguments means connect with unix -// t.connect(fd, "/tmp/socket") -// -// 3 arguments means connect with TCP or UDP -// t.connect(fd, "127.0.0.1", 80) -static Handle Connect(const Arguments& args) { - HandleScope scope; - - if (!args[0]->IsInt32()) { - return ThrowException(Exception::TypeError( - String::New("First argument should be file descriptor"))); - } - - if (args.Length() < 2) { - return ThrowException(Exception::TypeError( - String::New("Must have at least two args"))); - } - - int fd = args[0]->Int32Value(); - - struct sockaddr *addr; - socklen_t addrlen; - if (args.Length() == 2) { +// NOT AT ALL THREAD SAFE - but that's okay for node.js +// (yes this is all to avoid one small heap alloc) +static struct sockaddr *addr; +static socklen_t addrlen; +static struct sockaddr_un un; +static struct sockaddr_in6 in6; +static inline Handle ParseAddressArgs(Handle first, + Handle second, + struct in6_addr default_addr + ) { + if (first->IsString() && second->IsUndefined()) { // UNIX - String::Utf8Value path(args[1]->ToString()); - - struct sockaddr_un un = {0}; + String::Utf8Value path(first->ToString()); if (path.length() > sizeof un.sun_path) { - return ThrowException(Exception::Error(String::New("Socket path too long"))); + return Exception::Error(String::New("Socket path too long")); } + memset(&un, 0, sizeof un); un.sun_family = AF_UNIX; strcpy(un.sun_path, *path); @@ -125,27 +164,29 @@ static Handle Connect(const Arguments& args) { } else { // TCP or UDP - String::Utf8Value ip(args[1]->ToString()); - int port = args[2]->Int32Value(); - - struct sockaddr_in6 in6 = {0}; - - char ipv6[255] = "::FFFF:"; - - if (inet_pton(AF_INET, *ip, &(in6.sin6_addr)) > 0) { - // If this is an IPv4 address then we need to change it to the - // IPv4-mapped-on-IPv6 format which looks like - // ::FFFF: - // For more information see "Address Format" ipv6(7) and "BUGS" in - // inet_pton(3) - strcat(ipv6, *ip); + int port = first->Int32Value(); + memset(&in6, 0, sizeof in6); + if (!second->IsString()) { + in6.sin6_addr = default_addr; } else { - strcpy(ipv6, *ip); - } - - if (inet_pton(AF_INET6, ipv6, &(in6.sin6_addr)) <= 0) { - return ThrowException( - ErrnoException(errno, "inet_pton", "Invalid IP Address")); + String::Utf8Value ip(second->ToString()); + + char ipv6[255] = "::FFFF:"; + + if (inet_pton(AF_INET, *ip, &(in6.sin6_addr)) > 0) { + // If this is an IPv4 address then we need to change it + // to the IPv4-mapped-on-IPv6 format which looks like + // ::FFFF: + // For more information see "Address Format" ipv6(7) and + // "BUGS" in inet_pton(3) + strcat(ipv6, *ip); + } else { + strcpy(ipv6, *ip); + } + + if (inet_pton(AF_INET6, ipv6, &(in6.sin6_addr)) <= 0) { + return ErrnoException(errno, "inet_pton", "Invalid IP Address"); + } } in6.sin6_family = AF_INET6; @@ -154,6 +195,106 @@ static Handle Connect(const Arguments& args) { addr = (struct sockaddr*)&in6; addrlen = sizeof in6; } + return Handle(); +} + + +// Bind with UNIX +// t.bind(fd, "/tmp/socket") +// Bind with TCP +// t.bind(fd, 80, "192.168.11.2") +// t.bind(fd, 80) +static Handle Bind(const Arguments& args) { + HandleScope scope; + + if (args.Length() < 2) { + return ThrowException(Exception::TypeError( + String::New("Must have at least two args"))); + } + + FD_ARG(args[0]) + + Handle error = ParseAddressArgs(args[1], args[2], in6addr_any); + if (!error.IsEmpty()) return ThrowException(error); + + int flags = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); + + int r = bind(fd, addr, addrlen); + + if (r < 0) { + return ThrowException(ErrnoException(errno, "bind")); + } + + return Undefined(); +} + + +static Handle Close(const Arguments& args) { + HandleScope scope; + + FD_ARG(args[0]) + + if (0 > close(fd)) { + return ThrowException(ErrnoException(errno, "close")); + } + + return Undefined(); +} + +// t.shutdown(fd, "read"); -- SHUT_RD +// t.shutdown(fd, "write"); -- SHUT_WR +// t.shutdown(fd, "readwrite"); -- SHUT_RDWR +// second arg defaults to "write". +static Handle Shutdown(const Arguments& args) { + HandleScope scope; + + FD_ARG(args[0]) + + int how = SHUT_WR; + + if (args[1]->IsString()) { + String::Utf8Value t(args[0]->ToString()); + if (0 == strcasecmp(*t, "write")) { + how = SHUT_WR; + } else if (0 == strcasecmp(*t, "read")) { + how = SHUT_RD; + } else if (0 == strcasecmp(*t, "readwrite")) { + how = SHUT_RDWR; + } else { + return ThrowException(Exception::Error(String::New( + "Unknown shutdown method. (Use 'read', 'write', or 'readwrite'.)"))); + } + } + + if (0 > shutdown(fd, how)) { + return ThrowException(ErrnoException(errno, "shutdown")); + } + + return Undefined(); +} + + +// Connect with unix +// t.connect(fd, "/tmp/socket") +// +// Connect with TCP or UDP +// t.connect(fd, 80, "192.168.11.2") +// t.connect(fd, 80, "::1") +// t.connect(fd, 80) +// the third argument defaults to "::1" +static Handle Connect(const Arguments& args) { + HandleScope scope; + + if (args.Length() < 2) { + return ThrowException(Exception::TypeError( + String::New("Must have at least two args"))); + } + + FD_ARG(args[0]) + + Handle error = ParseAddressArgs(args[1], args[2], in6addr_loopback); + if (!error.IsEmpty()) return ThrowException(error); int r = connect(fd, addr, addrlen); @@ -165,14 +306,204 @@ static Handle Connect(const Arguments& args) { } +static Handle Listen(const Arguments& args) { + HandleScope scope; + + FD_ARG(args[0]) + int backlog = args[1]->IsInt32() ? args[1]->Int32Value() : 128; + + if (0 > listen(fd, backlog)) { + return ThrowException(ErrnoException(errno, "listen")); + } + + return Undefined(); +} + + +// var peerInfo = t.accept(server_fd); +// +// peerInfo.fd +// peerInfo.remoteAddress +// peerInfo.remotePort +// +// Returns a new nonblocking socket fd. If the listen queue is empty the +// function returns null (wait for server_fd to become readable and try +// again) +static Handle Accept(const Arguments& args) { + HandleScope scope; + + FD_ARG(args[0]) + + struct sockaddr_storage addr; + socklen_t len; + + int peer = accept(fd, (struct sockaddr*) &addr, &len); + + if (peer < 0) { + if (errno == EAGAIN) return Null(); + return ThrowException(ErrnoException(errno, "accept")); + } + + if (!SetNonBlock(peer)) { + int fcntl_errno = errno; + close(peer); + return ThrowException(ErrnoException(fcntl_errno, "fcntl")); + } + + Local peer_info = Object::New(); + + peer_info->Set(fd_symbol, Integer::New(fd)); + + if (addr.ss_family == AF_INET6) { + struct sockaddr_in6 *a = reinterpret_cast(&addr); + + char ip[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, &a->sin6_addr, ip, INET6_ADDRSTRLEN); + + int port = ntohs(a->sin6_port); + + peer_info->Set(remote_address_symbol, String::New(ip)); + peer_info->Set(remote_port_symbol, Integer::New(port)); + } + + return scope.Close(peer_info); +} + + +static Handle GetSocketError(const Arguments& args) { + HandleScope scope; + + FD_ARG(args[0]) + + int error; + socklen_t len = sizeof(int); + int r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); + + if (r < 0) { + return ThrowException(ErrnoException(errno, "getsockopt")); + } + + return scope.Close(Integer::New(error)); +} + +// var bytesRead = t.read(fd, buffer, offset, length); +// returns null on EAGAIN or EINTR, raises an exception on all other errors +// returns 0 on EOF. +static Handle Read(const Arguments& args) { + HandleScope scope; + + if (args.Length() < 4) { + return ThrowException(Exception::TypeError( + String::New("Takes 4 parameters"))); + } + + FD_ARG(args[0]) + + if (!IsBuffer(args[1])) { + return ThrowException(Exception::TypeError( + String::New("Second argument should be a buffer"))); + } + + struct buffer * buffer = BufferUnwrap(args[1]); + + size_t off = args[2]->Int32Value(); + if (buffer_p(buffer, off) == NULL) { + return ThrowException(Exception::Error( + String::New("Offset is out of bounds"))); + } + + size_t len = args[3]->Int32Value(); + if (buffer_remaining(buffer, off) < len) { + return ThrowException(Exception::Error( + String::New("Length is extends beyond buffer"))); + } + + size_t bytes_read = read(fd, + buffer_p(buffer, off), + buffer_remaining(buffer, off)); + + if (bytes_read < 0) { + if (errno == EAGAIN || errno == EINTR) return Null(); + return ThrowException(ErrnoException(errno, "read")); + } + + return Integer::New(bytes_read); +} + +// var bytesWritten = t.write(fd, buffer, offset, length); +// returns null on EAGAIN or EINTR, raises an exception on all other errors +static Handle Write(const Arguments& args) { + HandleScope scope; + + if (args.Length() < 4) { + return ThrowException(Exception::TypeError( + String::New("Takes 4 parameters"))); + } + + FD_ARG(args[0]) + + if (!IsBuffer(args[1])) { + return ThrowException(Exception::TypeError( + String::New("Second argument should be a buffer"))); + } + + struct buffer * buffer = BufferUnwrap(args[1]); + + size_t off = args[2]->Int32Value(); + if (buffer_p(buffer, off) == NULL) { + return ThrowException(Exception::Error( + String::New("Offset is out of bounds"))); + } + + size_t len = args[3]->Int32Value(); + if (buffer_remaining(buffer, off) < len) { + return ThrowException(Exception::Error( + String::New("Length is extends beyond buffer"))); + } + + size_t written = write(fd, + buffer_p(buffer, off), + buffer_remaining(buffer, off)); + + if (written < 0) { + if (errno == EAGAIN || errno == EINTR) return Null(); + return ThrowException(ErrnoException(errno, "write")); + } + + return Integer::New(written); +} + void InitNet2(Handle target) { HandleScope scope; + NODE_SET_METHOD(target, "write", Write); + NODE_SET_METHOD(target, "read", Read); + NODE_SET_METHOD(target, "socket", Socket); + NODE_SET_METHOD(target, "close", Close); + NODE_SET_METHOD(target, "shutdown", Shutdown); + NODE_SET_METHOD(target, "pipe", Pipe); + NODE_SET_METHOD(target, "socketpair", SocketPair); + NODE_SET_METHOD(target, "connect", Connect); + NODE_SET_METHOD(target, "bind", Bind); + NODE_SET_METHOD(target, "listen", Listen); + NODE_SET_METHOD(target, "accept", Accept); + NODE_SET_METHOD(target, "getSocketError", GetSocketError); + + + target->Set(String::NewSymbol("EINPROGRESS"), Integer::New(EINPROGRESS)); + target->Set(String::NewSymbol("EINTR"), Integer::New(EINTR)); + target->Set(String::NewSymbol("EACCES"), Integer::New(EACCES)); + target->Set(String::NewSymbol("EPERM"), Integer::New(EPERM)); + target->Set(String::NewSymbol("EADDRINUSE"), Integer::New(EADDRINUSE)); + target->Set(String::NewSymbol("ECONNREFUSED"), Integer::New(ECONNREFUSED)); errno_symbol = NODE_PSYMBOL("errno"); syscall_symbol = NODE_PSYMBOL("syscall"); + fd_symbol = NODE_PSYMBOL("fd"); + remote_address_symbol = NODE_PSYMBOL("remoteAddress"); + remote_port_symbol = NODE_PSYMBOL("remotePort"); } } // namespace node diff --git a/tcp.js b/tcp.js new file mode 100644 index 0000000..af73349 --- /dev/null +++ b/tcp.js @@ -0,0 +1,55 @@ +var socket = process.socket; +var bind = process.bind; +var listen = process.listen; +var accept = process.accept; +var close = process.close; + +var Server = function (listener) { + var self = this; + + if (listener) { + self.addListener("connection", listener); + } + +}; +process.inherits(Server, process.EventEmitter); + +Server.prototype.listen = function (port, host) { + var self = this; + + if (self.fd) throw new Error("Already running"); + + self.fd = process.socket("TCP"); + // TODO dns resolution + bind(self.fd, port, host); + listen(self.fd, 128); // TODO configurable backlog + + self.watcher = new process.IOWatcher(self.fd, true, false, function () { + var peerInfo; + while (self.fd) { + peerInfo = accept(self.fd); + if (peerInfo === null) return; + self.emit("connection", peerInfo); + } + }); + + self.watcher.start(); +}; + +Server.prototype.close = function () { + var self = this; + if (!self.fd) throw new Error("Not running"); + self.watcher.stop(); + close(self.fd); + this.watcher = null; + this.fd = null; +}; + +/////////////////////////////////////////////////////// + +var sys = require("sys"); +var server = new Server(function () { + sys.puts("connection"); + server.close(); +}); +server.listen(8000);