// Copyright 2009 Ryan Dahl <ry@tinyclouds.org>
#include <node_io_watcher.h>
+#include <node.h>
+#include <v8.h>
+
#include <assert.h>
namespace node {
using namespace v8;
Persistent<FunctionTemplate> IOWatcher::constructor_template;
+Persistent<String> callback_symbol;
void IOWatcher::Initialize(Handle<Object> target) {
HandleScope scope;
Local<FunctionTemplate> t = FunctionTemplate::New(IOWatcher::New);
constructor_template = Persistent<FunctionTemplate>::New(t);
- constructor_template->InstanceTemplate()->SetInternalFieldCount(2);
+ constructor_template->InstanceTemplate()->SetInternalFieldCount(1);
constructor_template->SetClassName(String::NewSymbol("IOWatcher"));
NODE_SET_PROTOTYPE_METHOD(constructor_template, "start", IOWatcher::Start);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop);
+ NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set);
target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction());
+
+ callback_symbol = NODE_PSYMBOL("callback");
}
assert(w == &io->watcher_);
HandleScope scope;
- Local<Value> callback_v = io->handle_->GetInternalField(1);
+ Local<Value> callback_v = io->handle_->Get(callback_symbol);
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);
//
-// var io = new process.IOWatcher(fd, true, true, function (readable, writable) {
+// var io = new process.IOWatcher(function (readable, writable) {
//
// });
+// io.set(fd, true, false);
+// io.start();
//
Handle<Value> IOWatcher::New(const Arguments& args) {
HandleScope scope;
+ if (!args[0]->IsFunction()) {
+ return ThrowException(Exception::TypeError(
+ String::New("First arg should a callback.")));
+ }
+
+ Local<Function> callback = Local<Function>::Cast(args[0]);
+
+ IOWatcher *s = new IOWatcher();
+
+ s->Wrap(args.This());
+
+ s->handle_->Set(callback_symbol, callback);
+
+ return args.This();
+}
+
+
+Handle<Value> IOWatcher::Start(const Arguments& args) {
+ HandleScope scope;
+
+ IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
+
+ ev_io_start(EV_DEFAULT_UC_ &io->watcher_);
+
+ io->Ref();
+
+ return Undefined();
+}
+
+Handle<Value> IOWatcher::Set(const Arguments& args) {
+ HandleScope scope;
+
+ IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
+
if (!args[0]->IsInt32()) {
return ThrowException(Exception::TypeError(
String::New("First arg should be a file descriptor.")));
if (args[2]->IsTrue()) events |= EV_WRITE;
- if (!args[3]->IsFunction()) {
- return ThrowException(Exception::TypeError(
- String::New("Fourth arg should a callback.")));
- }
-
- Local<Function> callback = Local<Function>::Cast(args[3]);
-
- IOWatcher *s = new IOWatcher(fd, events);
-
- s->Wrap(args.This());
- s->handle_->SetInternalField(1, callback);
-
- return args.This();
-}
-
-
-Handle<Value> IOWatcher::Start(const Arguments& args) {
- HandleScope scope;
-
- IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
-
- ev_io_start(EV_DEFAULT_UC_ &io->watcher_);
-
- io->Ref();
+ ev_io_set(&io->watcher_, fd, events);
return Undefined();
}
-
Handle<Value> IOWatcher::Stop(const Arguments& args) {
HandleScope scope;
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
void IOWatcher::Stop () {
if (watcher_.active) {
+ HandleScope scope;
ev_io_stop(EV_DEFAULT_UC_ &watcher_);
Unref();
}
#ifndef NODE_IO_H_
#define NODE_IO_H_
-#include <node.h>
+#include <node_object_wrap.h>
#include <ev.h>
namespace node {
protected:
static v8::Persistent<v8::FunctionTemplate> constructor_template;
- IOWatcher(int fd, int events) : ObjectWrap() {
- ev_io_init(&watcher_, IOWatcher::Callback, fd, events);
+ IOWatcher() : ObjectWrap() {
+ ev_init(&watcher_, IOWatcher::Callback);
watcher_.data = this;
}
static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> Start(const v8::Arguments& args);
static v8::Handle<v8::Value> Stop(const v8::Arguments& args);
+ static v8::Handle<v8::Value> Set(const v8::Arguments& args);
private:
static void Callback(EV_P_ ev_io *watcher, int revents);
FD_ARG(args[0])
- struct sockaddr_storage addr;
- socklen_t len;
+ struct sockaddr_storage address_storage;
+ socklen_t len = sizeof(struct sockaddr_storage);
- int peer = accept(fd, (struct sockaddr*) &addr, &len);
+ int peer_fd = accept(fd, (struct sockaddr*) &address_storage, &len);
- if (peer < 0) {
- if (errno == EAGAIN) return Null();
+ if (peer_fd < 0) {
+ if (errno == EAGAIN) return scope.Close(Null());
return ThrowException(ErrnoException(errno, "accept"));
}
- if (!SetNonBlock(peer)) {
+ if (!SetNonBlock(peer_fd)) {
int fcntl_errno = errno;
- close(peer);
- return ThrowException(ErrnoException(fcntl_errno, "fcntl"));
+ close(peer_fd);
+ return ThrowException(ErrnoException(fcntl_errno, "fcntl", "Cannot make peer non-blocking"));
}
Local<Object> peer_info = Object::New();
- peer_info->Set(fd_symbol, Integer::New(fd));
+ peer_info->Set(fd_symbol, Integer::New(peer_fd));
- if (addr.ss_family == AF_INET6) {
- struct sockaddr_in6 *a = reinterpret_cast<struct sockaddr_in6*>(&addr);
+ if (address_storage.ss_family == AF_INET6) {
+ struct sockaddr_in6 *a = (struct sockaddr_in6*)&address_storage;
char ip[INET6_ADDRSTRLEN];
- inet_ntop(AF_INET6, &a->sin6_addr, ip, INET6_ADDRSTRLEN);
+ inet_ntop(AF_INET6, &(a->sin6_addr), ip, INET6_ADDRSTRLEN);
int port = ntohs(a->sin6_port);
target->Set(String::NewSymbol("EADDRINUSE"), Integer::New(EADDRINUSE));
target->Set(String::NewSymbol("ECONNREFUSED"), Integer::New(ECONNREFUSED));
- errno_symbol = NODE_PSYMBOL("errno");
- syscall_symbol = NODE_PSYMBOL("syscall");
- fd_symbol = NODE_PSYMBOL("fd");
+ errno_symbol = NODE_PSYMBOL("errno");
+ syscall_symbol = NODE_PSYMBOL("syscall");
+ fd_symbol = NODE_PSYMBOL("fd");
remote_address_symbol = NODE_PSYMBOL("remoteAddress");
- remote_port_symbol = NODE_PSYMBOL("remotePort");
+ remote_port_symbol = NODE_PSYMBOL("remotePort");
}
} // namespace node
-var socket = process.socket;
-var bind = process.bind;
-var listen = process.listen;
-var accept = process.accept;
-var close = process.close;
+var debugLevel = 0;
+if ("NODE_DEBUG" in process.ENV) debugLevel = 1;
+function debug (x) {
+ if (debugLevel > 0) {
+ process.stdio.writeError(x + "\n");
+ }
+}
+
+var socket = process.socket;
+var bind = process.bind;
+var listen = process.listen;
+var accept = process.accept;
+var close = process.close;
+var shutdown = process.shutdown;
+
+var Peer = function (peerInfo) {
+ process.EventEmitter.call();
+
+ var self = this;
+
+ self.fd = peerInfo.fd;
+ self.remoteAddress = peerInfo.remoteAddress;
+ self.remotePort = peerInfo.remotePort;
+
+ self.readWatcher = new process.IOWatcher(function () {
+ debug(self.fd + " readable");
+ });
+ self.readWatcher.set(self.fd, true, false);
+ self.readWatcher.start();
+
+ self.writeWatcher = new process.IOWatcher(function () {
+ debug(self.fd + " writable");
+ });
+ self.writeWatcher.set(self.fd, false, true);
+
+ self.readable = true;
+ self.writable = true;
+};
+process.inherits(Peer, process.EventEmitter);
+
+Peer.prototype.close = function () {
+ this.readable = false;
+ this.writable = false;
+
+ this.writeWatcher.stop();
+ this.readWatcher.stop();
+ close(this.fd);
+ debug("close peer " + this.fd);
+ this.fd = null;
+};
+
+
var Server = function (listener) {
var self = this;
if (listener) {
self.addListener("connection", listener);
}
-
+
+ self.watcher = new process.IOWatcher(function (readable, writeable) {
+ debug("readable " + readable);
+ debug("writable " + writeable);
+ while (self.fd) {
+ debug("accept from " + self.fd);
+ var peerInfo = accept(self.fd);
+ debug("accept: " + JSON.stringify(peerInfo));
+ if (!peerInfo) return;
+ var peer = new Peer(peerInfo);
+ self.emit("connection", peer);
+ }
+ });
};
process.inherits(Server, process.EventEmitter);
bind(self.fd, port, host);
listen(self.fd, 128); // TODO configurable backlog
- self.watcher = new process.IOWatcher(self.fd, true, false, function () {
- var peerInfo;
- while (self.fd) {
- peerInfo = accept(self.fd);
- if (peerInfo === null) return;
- self.emit("connection", peerInfo);
- }
- });
-
+ self.watcher.set(self.fd, true, false);
self.watcher.start();
};
if (!self.fd) throw new Error("Not running");
self.watcher.stop();
close(self.fd);
- this.watcher = null;
- this.fd = null;
+ self.fd = null;
};
///////////////////////////////////////////////////////
var sys = require("sys");
-var server = new Server(function () {
- sys.puts("connection");
- server.close();
+var server = new Server(function (peer) {
+ sys.puts("connection (" + peer.fd + "): "
+ + peer.remoteAddress
+ + " port "
+ + peer.remotePort
+ );
+ sys.puts("server fd: " + server.fd);
+ peer.close();
});
server.listen(8000);
+sys.puts("server fd: " + server.fd);