From fdf46a65c92efc6de21c20c2d65d6883f269d0e6 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Mon, 15 Mar 2010 15:11:40 -0700 Subject: [PATCH] Use streams for stdout and stdin --- lib/net.js | 13 +-- lib/repl.js | 10 ++- lib/sys.js | 8 +- src/node.cc | 12 ++- src/node.js | 27 +++++- src/node_stdio.cc | 183 ++++----------------------------------- src/node_stdio.h | 6 +- test/simple/test-net-pingpong.js | 3 +- 8 files changed, 76 insertions(+), 186 deletions(-) diff --git a/lib/net.js b/lib/net.js index 176b6f2..3a2466d 100644 --- a/lib/net.js +++ b/lib/net.js @@ -359,12 +359,13 @@ function initSocket (self) { function Socket (fd) { process.EventEmitter.call(this); - if (fd) { + this.fd = null; + + if (parseInt(fd) >= 0) { initSocket(this); this.fd = fd; - this.resume(); this.readable = true; this._writeWatcher.set(this.fd, false, true); @@ -615,7 +616,7 @@ function doConnect (socket, port, host) { var errno = socketError(socket.fd); if (errno == 0) { // connection established - socket._readWatcher.start(); + socket.resume(); socket.readable = true; socket.writable = true; socket._writeWatcher.callback = socket._doFlush; @@ -680,7 +681,7 @@ Socket.prototype.pause = function () { Socket.prototype.resume = function () { - if (!this.fd) throw new Error('Cannot resume() closed Socket.'); + if (this.fd === null) throw new Error('Cannot resume() closed Socket.'); this._readWatcher.set(this.fd, true, false); this._readWatcher.start(); }; @@ -762,16 +763,18 @@ function Server (listener) { while (self.fd) { var peerInfo = accept(self.fd); if (!peerInfo) return; + var s = new Socket(peerInfo.fd); s.remoteAddress = peerInfo.remoteAddress; s.remotePort = peerInfo.remotePort; s.type = self.type; s.server = self; + s.resume(); + self.emit('connection', s); // The 'connect' event probably should be removed for server-side // sockets. It's redundent. s.emit('connect'); - timeout.active(s); } }; } diff --git a/lib/repl.js b/lib/repl.js index ee7dd9b..1608102 100644 --- a/lib/repl.js +++ b/lib/repl.js @@ -12,13 +12,17 @@ exports.scope = {}; exports.prompt = "node> "; // Can overridden with custom print functions, such as `probe` or `eyes.js` exports.writer = sys.p; + +var stdin; + exports.start = function (prompt) { if (prompt !== undefined) { exports.prompt = prompt; } - process.stdio.open(); - process.stdio.addListener("data", readline); + stdin = process.openStdin(); + stdin.setEncoding('utf8'); + stdin.addListener("data", readline); displayPrompt(); } @@ -96,7 +100,7 @@ function parseREPLKeyword (cmd) { displayPrompt(); return true; case ".exit": - process.stdio.close(); + stdin.close(); return true; case ".help": sys.puts(".break\tSometimes you get stuck in a place you can't get out... This will get you out."); diff --git a/lib/sys.js b/lib/sys.js index a828e5c..aef8053 100644 --- a/lib/sys.js +++ b/lib/sys.js @@ -2,23 +2,23 @@ var events = require('events'); exports.print = function () { for (var i = 0, len = arguments.length; i < len; ++i) { - process.stdio.write(arguments[i]); + process.stdout.write(arguments[i]); } }; exports.puts = function () { for (var i = 0, len = arguments.length; i < len; ++i) { - process.stdio.write(arguments[i] + '\n'); + process.stdout.write(arguments[i] + '\n'); } }; exports.debug = function (x) { - process.stdio.writeError("DEBUG: " + x + "\n"); + process.binding('stdio').writeError("DEBUG: " + x + "\n"); }; exports.error = function (x) { for (var i = 0, len = arguments.length; i < len; ++i) { - process.stdio.writeError(arguments[i] + '\n'); + process.binding('stdio').writeError(arguments[i] + '\n'); } }; diff --git a/src/node.cc b/src/node.cc index c9c1354..608b911 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1073,7 +1073,16 @@ static Handle Binding(const Arguments& args) { Local exports; - if (!strcmp(*module_v, "http")) { + if (!strcmp(*module_v, "stdio")) { + if (binding_cache->Has(module)) { + exports = binding_cache->Get(module)->ToObject(); + } else { + exports = Object::New(); + Stdio::Initialize(exports); + binding_cache->Set(module, exports); + } + + } else if (!strcmp(*module_v, "http")) { if (binding_cache->Has(module)) { exports = binding_cache->Get(module)->ToObject(); } else { @@ -1258,7 +1267,6 @@ static void Load(int argc, char *argv[]) { IOWatcher::Initialize(process); // io_watcher.cc IdleWatcher::Initialize(process); // idle_watcher.cc Timer::Initialize(process); // timer.cc - Stdio::Initialize(process); // stdio.cc InitNet2(process); // net2.cc InitHttpParser(process); // http_parser.cc ChildProcess::Initialize(process); // child_process.cc diff --git a/src/node.js b/src/node.js index d326265..2af9ab8 100644 --- a/src/node.js +++ b/src/node.js @@ -106,10 +106,12 @@ process.createChildProcess = function (file, args, env) { return child; }; + process.assert = function (x, msg) { if (!(x)) throw new Error(msg || "assertion error"); }; + // From jQuery.extend in the jQuery JavaScript Library v1.3.2 // Copyright (c) 2009 John Resig // Dual licensed under the MIT and GPL licenses. @@ -119,7 +121,7 @@ var mixinMessage; process.mixin = function() { if (!mixinMessage) { mixinMessage = 'deprecation warning: process.mixin will be removed from node-core future releases.\n' - process.stdio.writeError(mixinMessage); + process.binding('stdio').writeError(mixinMessage); } // copy reference to target object var target = arguments[0] || {}, i = 1, length = arguments.length, deep = false, source; @@ -338,7 +340,7 @@ if ("NODE_DEBUG" in process.env) debugLevel = 1; function debug (x) { if (debugLevel > 0) { - process.stdio.writeError(x + "\n"); + process.binding('stdio').writeError(x + "\n"); } } @@ -781,6 +783,27 @@ Module.prototype._waitChildrenLoad = function (callback) { }; +var stdout; +process.__defineGetter__('stdout', function () { + if (stdout) return stdout; + var net = requireNative('net'); + stdout = new net.Socket(process.binding('stdio').stdoutFD); + return stdout; +}); + +var stdin; +process.openStdin = function () { + if (stdin) return stdin; + var net = requireNative('net'); + var fd = process.binding('stdio').openStdin(); + stdin = new net.Socket(fd); + process.stdout.write(stdin.fd + "\n"); + stdin.resume(); + stdin.readable = true; + return stdin; +}; + + process.exit = function (code) { process.emit("exit"); process.reallyExit(code); diff --git a/src/node_stdio.cc b/src/node_stdio.cc index 2b5b306..ee1278f 100644 --- a/src/node_stdio.cc +++ b/src/node_stdio.cc @@ -8,10 +8,8 @@ #include using namespace v8; -using namespace node; +namespace node { -static Persistent stdio; -static Persistent emit; static struct coupling *stdin_coupling = NULL; static struct coupling *stdout_coupling = NULL; @@ -19,33 +17,8 @@ static struct coupling *stdout_coupling = NULL; static int stdin_fd = -1; static int stdout_fd = -1; -static evcom_reader in; -static evcom_writer out; -static enum encoding stdin_encoding; - -static void -EmitInput (Local input) -{ - HandleScope scope; - - Local argv[2] = { String::NewSymbol("data"), input }; - - emit->Call(stdio, 2, argv); -} - -static void -EmitClose (void) -{ - HandleScope scope; - - Local argv[1] = { String::NewSymbol("close") }; - - emit->Call(stdio, 1, argv); -} - - -static inline Local errno_exception(int errorno) { +static Local errno_exception(int errorno) { Local e = Exception::Error(String::NewSymbol(strerror(errorno))); Local obj = e->ToObject(); obj->Set(String::NewSymbol("errno"), Integer::New(errorno)); @@ -53,7 +26,7 @@ static inline Local errno_exception(int errorno) { } -/* STDERR IS ALWAY SYNC */ +/* STDERR IS ALWAY SYNC ALWAYS UTF8 */ static Handle WriteError (const Arguments& args) { @@ -81,84 +54,8 @@ WriteError (const Arguments& args) return Undefined(); } -static Handle -Write (const Arguments& args) -{ - HandleScope scope; - - if (args.Length() == 0) { - return ThrowException(Exception::Error(String::New("Bad argument"))); - } - - enum encoding enc = UTF8; - if (args.Length() > 1) enc = ParseEncoding(args[1], UTF8); - - ssize_t len = DecodeBytes(args[0], enc); - - if (len < 0) { - Local exception = Exception::TypeError(String::New("Bad argument")); - return ThrowException(exception); - } - - char buf[len]; - ssize_t written = DecodeWrite(buf, len, args[0], enc); - - assert(written == len); - - evcom_writer_write(&out, buf, len); - - return Undefined(); -} - -static void -detach_in (evcom_reader *r) -{ - assert(r == &in); - HandleScope scope; - - EmitClose(); - evcom_reader_detach(&in); - - if (stdin_coupling) { - coupling_destroy(stdin_coupling); - stdin_coupling = NULL; - } - - stdin_fd = -1; -} - -static void -detach_out (evcom_writer* w) -{ - assert(w == &out); - - evcom_writer_detach(&out); - if (stdout_coupling) { - coupling_destroy(stdout_coupling); - stdout_coupling = NULL; - } - stdout_fd = -1; -} - -static void -on_read (evcom_reader *r, const void *buf, size_t len) -{ - assert(r == &in); - HandleScope scope; - - if (!len) { - return; - } - - Local data = Encode(buf, len, stdin_encoding); - - EmitInput(data); -} - -static inline int -set_nonblock (int fd) -{ +static inline int SetNonblock(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) return -1; @@ -168,20 +65,14 @@ set_nonblock (int fd) return 0; } -static Handle -Open (const Arguments& args) -{ + +static Handle OpenStdin(const Arguments& args) { HandleScope scope; if (stdin_fd >= 0) { return ThrowException(Exception::Error(String::New("stdin already open"))); } - stdin_encoding = UTF8; - if (args.Length() > 0) { - stdin_encoding = ParseEncoding(args[0]); - } - if (isatty(STDIN_FILENO)) { // XXX selecting on tty fds wont work in windows. // Must ALWAYS make a coupling on shitty platforms. @@ -190,34 +81,11 @@ Open (const Arguments& args) stdin_coupling = coupling_new_pull(STDIN_FILENO); stdin_fd = coupling_nonblocking_fd(stdin_coupling); } - set_nonblock(stdin_fd); - - evcom_reader_init(&in); - - in.on_read = on_read; - in.on_close = detach_in; + SetNonblock(stdin_fd); - evcom_reader_set(&in, stdin_fd); - evcom_reader_attach(EV_DEFAULT_ &in); - - return Undefined(); + return scope.Close(Integer::New(stdin_fd)); } -static Handle -Close (const Arguments& args) -{ - HandleScope scope; - - assert(stdio == args.Holder()); - - if (stdin_fd < 0) { - return ThrowException(Exception::Error(String::New("stdin not open"))); - } - - evcom_reader_close(&in); - - return Undefined(); -} void Stdio::Flush() { if (stdout_fd >= 0) { @@ -232,27 +100,9 @@ void Stdio::Flush() { } } -void -Stdio::Initialize (v8::Handle target) -{ - HandleScope scope; - - Local stdio_local = - EventEmitter::constructor_template->GetFunction()->NewInstance(0, NULL); - - stdio = Persistent::New(stdio_local); - - NODE_SET_METHOD(stdio, "open", Open); - NODE_SET_METHOD(stdio, "write", Write); - NODE_SET_METHOD(stdio, "writeError", WriteError); - NODE_SET_METHOD(stdio, "close", Close); - target->Set(String::NewSymbol("stdio"), stdio); - - Local emit_v = stdio->Get(String::NewSymbol("emit")); - assert(emit_v->IsFunction()); - Local emit_f = Local::Cast(emit_v); - emit = Persistent::New(emit_f); +void Stdio::Initialize(v8::Handle target) { + HandleScope scope; if (isatty(STDOUT_FILENO)) { // XXX selecting on tty fds wont work in windows. @@ -262,10 +112,13 @@ Stdio::Initialize (v8::Handle target) stdout_coupling = coupling_new_push(STDOUT_FILENO); stdout_fd = coupling_nonblocking_fd(stdout_coupling); } - set_nonblock(stdout_fd); + SetNonblock(stdout_fd); + + target->Set(String::NewSymbol("stdoutFD"), Integer::New(stdout_fd)); - evcom_writer_init(&out); - out.on_close = detach_out; - evcom_writer_set(&out, stdout_fd); - evcom_writer_attach(EV_DEFAULT_ &out); + NODE_SET_METHOD(target, "writeError", WriteError); + NODE_SET_METHOD(target, "openStdin", OpenStdin); } + + +} // namespace node diff --git a/src/node_stdio.h b/src/node_stdio.h index 60fa291..4763943 100644 --- a/src/node_stdio.h +++ b/src/node_stdio.h @@ -2,9 +2,7 @@ #define node_stdio_h #include - #include -#include namespace node { @@ -14,5 +12,5 @@ public: static void Flush (); }; -} // namespace node -#endif +} // namespace node +#endif // node_stdio_h diff --git a/test/simple/test-net-pingpong.js b/test/simple/test-net-pingpong.js index 94b9836..ad6410c 100644 --- a/test/simple/test-net-pingpong.js +++ b/test/simple/test-net-pingpong.js @@ -1,4 +1,5 @@ -process.mixin(require("../common")); +require("../common"); + net = require("net"); process.Buffer.prototype.toString = function () { -- 2.7.4