Use streams for stdout and stdin
authorRyan Dahl <ry@tinyclouds.org>
Mon, 15 Mar 2010 22:11:40 +0000 (15:11 -0700)
committerRyan Dahl <ry@tinyclouds.org>
Mon, 15 Mar 2010 22:11:40 +0000 (15:11 -0700)
lib/net.js
lib/repl.js
lib/sys.js
src/node.cc
src/node.js
src/node_stdio.cc
src/node_stdio.h
test/simple/test-net-pingpong.js

index 176b6f2..3a2466d 100644 (file)
@@ -359,12 +359,13 @@ function initSocket (self) {
 function Socket (fd) {
   process.EventEmitter.call(this);
 
-  if (fd) {
+  this.fd = null;
+
+  if (parseInt(fd) >= 0) {
     initSocket(this);
 
     this.fd = fd;
 
-    this.resume();
     this.readable = true;
 
     this._writeWatcher.set(this.fd, false, true);
@@ -615,7 +616,7 @@ function doConnect (socket, port, host) {
     var errno = socketError(socket.fd);
     if (errno == 0) {
       // connection established
-      socket._readWatcher.start();
+      socket.resume();
       socket.readable = true;
       socket.writable = true;
       socket._writeWatcher.callback = socket._doFlush;
@@ -680,7 +681,7 @@ Socket.prototype.pause = function () {
 
 
 Socket.prototype.resume = function () {
-  if (!this.fd) throw new Error('Cannot resume() closed Socket.');
+  if (this.fd === null) throw new Error('Cannot resume() closed Socket.');
   this._readWatcher.set(this.fd, true, false);
   this._readWatcher.start();
 };
@@ -762,16 +763,18 @@ function Server (listener) {
     while (self.fd) {
       var peerInfo = accept(self.fd);
       if (!peerInfo) return;
+
       var s = new Socket(peerInfo.fd);
       s.remoteAddress = peerInfo.remoteAddress;
       s.remotePort = peerInfo.remotePort;
       s.type = self.type;
       s.server = self;
+      s.resume();
+
       self.emit('connection', s);
       // The 'connect' event  probably should be removed for server-side
       // sockets. It's redundent.
       s.emit('connect');
-      timeout.active(s);
     }
   };
 }
index ee7dd9b..1608102 100644 (file)
@@ -12,13 +12,17 @@ exports.scope = {};
 exports.prompt = "node> ";
 // Can overridden with custom print functions, such as `probe` or `eyes.js`
 exports.writer = sys.p;
+
+var stdin;
+
 exports.start = function (prompt) {
   if (prompt !== undefined) {
     exports.prompt = prompt;
   }
 
-  process.stdio.open();
-  process.stdio.addListener("data", readline);
+  stdin = process.openStdin();
+  stdin.setEncoding('utf8');
+  stdin.addListener("data", readline);
   displayPrompt();
 }
 
@@ -96,7 +100,7 @@ function parseREPLKeyword (cmd) {
     displayPrompt();
     return true;
   case ".exit":
-    process.stdio.close();
+    stdin.close();
     return true;
   case ".help":
     sys.puts(".break\tSometimes you get stuck in a place you can't get out... This will get you out.");
index a828e5c..aef8053 100644 (file)
@@ -2,23 +2,23 @@ var events = require('events');
 
 exports.print = function () {
   for (var i = 0, len = arguments.length; i < len; ++i) {
-    process.stdio.write(arguments[i]);
+    process.stdout.write(arguments[i]);
   }
 };
 
 exports.puts = function () {
   for (var i = 0, len = arguments.length; i < len; ++i) {
-    process.stdio.write(arguments[i] + '\n');
+    process.stdout.write(arguments[i] + '\n');
   }
 };
 
 exports.debug = function (x) {
-  process.stdio.writeError("DEBUG: " + x + "\n");
+  process.binding('stdio').writeError("DEBUG: " + x + "\n");
 };
 
 exports.error = function (x) {
   for (var i = 0, len = arguments.length; i < len; ++i) {
-    process.stdio.writeError(arguments[i] + '\n');
+    process.binding('stdio').writeError(arguments[i] + '\n');
   }
 };
 
index c9c1354..608b911 100644 (file)
@@ -1073,7 +1073,16 @@ static Handle<Value> Binding(const Arguments& args) {
 
   Local<Object> exports;
 
-  if (!strcmp(*module_v, "http")) {
+  if (!strcmp(*module_v, "stdio")) {
+    if (binding_cache->Has(module)) {
+      exports = binding_cache->Get(module)->ToObject();
+    } else {
+      exports = Object::New();
+      Stdio::Initialize(exports);
+      binding_cache->Set(module, exports);
+    }
+
+  } else if (!strcmp(*module_v, "http")) {
     if (binding_cache->Has(module)) {
       exports = binding_cache->Get(module)->ToObject();
     } else {
@@ -1258,7 +1267,6 @@ static void Load(int argc, char *argv[]) {
   IOWatcher::Initialize(process);              // io_watcher.cc
   IdleWatcher::Initialize(process);            // idle_watcher.cc
   Timer::Initialize(process);                  // timer.cc
-  Stdio::Initialize(process);                  // stdio.cc
   InitNet2(process);                           // net2.cc
   InitHttpParser(process);                     // http_parser.cc
   ChildProcess::Initialize(process);           // child_process.cc
index d326265..2af9ab8 100644 (file)
@@ -106,10 +106,12 @@ process.createChildProcess = function (file, args, env) {
   return child;
 };
 
+
 process.assert = function (x, msg) {
   if (!(x)) throw new Error(msg || "assertion error");
 };
 
+
 // From jQuery.extend in the jQuery JavaScript Library v1.3.2
 // Copyright (c) 2009 John Resig
 // Dual licensed under the MIT and GPL licenses.
@@ -119,7 +121,7 @@ var mixinMessage;
 process.mixin = function() {
   if (!mixinMessage) {
     mixinMessage = 'deprecation warning: process.mixin will be removed from node-core future releases.\n'
-    process.stdio.writeError(mixinMessage);
+    process.binding('stdio').writeError(mixinMessage);
   }
   // copy reference to target object
   var target = arguments[0] || {}, i = 1, length = arguments.length, deep = false, source;
@@ -338,7 +340,7 @@ if ("NODE_DEBUG" in process.env) debugLevel = 1;
 
 function debug (x) {
   if (debugLevel > 0) {
-    process.stdio.writeError(x + "\n");
+    process.binding('stdio').writeError(x + "\n");
   }
 }
 
@@ -781,6 +783,27 @@ Module.prototype._waitChildrenLoad = function (callback) {
 };
 
 
+var stdout;
+process.__defineGetter__('stdout', function () {
+  if (stdout) return stdout;
+  var net = requireNative('net');
+  stdout = new net.Socket(process.binding('stdio').stdoutFD);
+  return stdout;
+});
+
+var stdin;
+process.openStdin = function () {
+  if (stdin) return stdin;
+  var net = requireNative('net');
+  var fd = process.binding('stdio').openStdin();
+  stdin = new net.Socket(fd);
+  process.stdout.write(stdin.fd + "\n");
+  stdin.resume();
+  stdin.readable = true;
+  return stdin;
+};
+
+
 process.exit = function (code) {
   process.emit("exit");
   process.reallyExit(code);
index 2b5b306..ee1278f 100644 (file)
@@ -8,10 +8,8 @@
 #include <errno.h>
 
 using namespace v8;
-using namespace node;
+namespace node {
 
-static Persistent<Object> stdio;
-static Persistent<Function> emit;
 
 static struct coupling *stdin_coupling = NULL;
 static struct coupling *stdout_coupling = NULL;
@@ -19,33 +17,8 @@ static struct coupling *stdout_coupling = NULL;
 static int stdin_fd = -1;
 static int stdout_fd = -1;
 
-static evcom_reader in;
-static evcom_writer out;
 
-static enum encoding stdin_encoding;
-
-static void
-EmitInput (Local<Value> input)
-{
-  HandleScope scope;
-
-  Local<Value> argv[2] = { String::NewSymbol("data"), input };
-
-  emit->Call(stdio, 2, argv);
-}
-
-static void
-EmitClose (void)
-{
-  HandleScope scope;
-
-  Local<Value> argv[1] = { String::NewSymbol("close") };
-
-  emit->Call(stdio, 1, argv);
-}
-
-
-static inline Local<Value> errno_exception(int errorno) {
+static Local<Value> errno_exception(int errorno) {
   Local<Value> e = Exception::Error(String::NewSymbol(strerror(errorno)));
   Local<Object> obj = e->ToObject();
   obj->Set(String::NewSymbol("errno"), Integer::New(errorno));
@@ -53,7 +26,7 @@ static inline Local<Value> errno_exception(int errorno) {
 }
 
 
-/* STDERR IS ALWAY SYNC */
+/* STDERR IS ALWAY SYNC ALWAYS UTF8 */
 static Handle<Value>
 WriteError (const Arguments& args)
 {
@@ -81,84 +54,8 @@ WriteError (const Arguments& args)
   return Undefined();
 }
 
-static Handle<Value>
-Write (const Arguments& args)
-{
-  HandleScope scope;
-
-  if (args.Length() == 0) {
-    return ThrowException(Exception::Error(String::New("Bad argument")));
-  }
-
-  enum encoding enc = UTF8;
-  if (args.Length() > 1) enc = ParseEncoding(args[1], UTF8);
-
-  ssize_t len = DecodeBytes(args[0], enc);
-
-  if (len < 0) {
-    Local<Value> exception = Exception::TypeError(String::New("Bad argument"));
-    return ThrowException(exception);
-  }
-
-  char buf[len];
-  ssize_t written = DecodeWrite(buf, len, args[0], enc);
-  
-  assert(written == len);
-
-  evcom_writer_write(&out, buf, len);
-
-  return Undefined();
-}
-
-static void
-detach_in (evcom_reader *r)
-{
-  assert(r == &in);
-  HandleScope scope;
-
-  EmitClose();
 
-  evcom_reader_detach(&in);
-
-  if (stdin_coupling) {
-    coupling_destroy(stdin_coupling);
-    stdin_coupling = NULL;
-  }
-
-  stdin_fd = -1;
-}
-
-static void
-detach_out (evcom_writer* w)
-{
-  assert(w == &out);
-
-  evcom_writer_detach(&out);
-  if (stdout_coupling) {
-    coupling_destroy(stdout_coupling);
-    stdout_coupling = NULL;
-  }
-  stdout_fd = -1;
-}
-
-static void
-on_read (evcom_reader *r, const void *buf, size_t len)
-{
-  assert(r == &in);
-  HandleScope scope;
-
-  if (!len) {
-    return;
-  }
-
-  Local<Value> data = Encode(buf, len, stdin_encoding);
-
-  EmitInput(data);
-}
-
-static inline int
-set_nonblock (int fd)
-{
+static inline int SetNonblock(int fd) {
   int flags = fcntl(fd, F_GETFL, 0);
   if (flags == -1) return -1;
 
@@ -168,20 +65,14 @@ set_nonblock (int fd)
   return 0;
 }
 
-static Handle<Value>
-Open (const Arguments& args)
-{
+
+static Handle<Value> OpenStdin(const Arguments& args) {
   HandleScope scope;
 
   if (stdin_fd >= 0) {
     return ThrowException(Exception::Error(String::New("stdin already open")));
   }
 
-  stdin_encoding = UTF8;
-  if (args.Length() > 0) {
-    stdin_encoding = ParseEncoding(args[0]);
-  }
-
   if (isatty(STDIN_FILENO)) {
     // XXX selecting on tty fds wont work in windows.
     // Must ALWAYS make a coupling on shitty platforms.
@@ -190,34 +81,11 @@ Open (const Arguments& args)
     stdin_coupling = coupling_new_pull(STDIN_FILENO);
     stdin_fd = coupling_nonblocking_fd(stdin_coupling);
   }
-  set_nonblock(stdin_fd);
-
-  evcom_reader_init(&in);
-
-  in.on_read = on_read;
-  in.on_close = detach_in;
+  SetNonblock(stdin_fd);
 
-  evcom_reader_set(&in, stdin_fd);
-  evcom_reader_attach(EV_DEFAULT_ &in);
-
-  return Undefined();
+  return scope.Close(Integer::New(stdin_fd));
 }
 
-static Handle<Value>
-Close (const Arguments& args)
-{
-  HandleScope scope;
-
-  assert(stdio == args.Holder());
-
-  if (stdin_fd < 0) {
-    return ThrowException(Exception::Error(String::New("stdin not open")));
-  }
-
-  evcom_reader_close(&in);
-
-  return Undefined();
-}
 
 void Stdio::Flush() {
   if (stdout_fd >= 0) {
@@ -232,27 +100,9 @@ void Stdio::Flush() {
   }
 }
 
-void
-Stdio::Initialize (v8::Handle<v8::Object> target)
-{
-  HandleScope scope;
-
-  Local<Object> stdio_local =
-    EventEmitter::constructor_template->GetFunction()->NewInstance(0, NULL);
-
-  stdio = Persistent<Object>::New(stdio_local);
-
-  NODE_SET_METHOD(stdio, "open", Open);
-  NODE_SET_METHOD(stdio, "write", Write);
-  NODE_SET_METHOD(stdio, "writeError", WriteError);
-  NODE_SET_METHOD(stdio, "close", Close);
 
-  target->Set(String::NewSymbol("stdio"), stdio);
-
-  Local<Value> emit_v = stdio->Get(String::NewSymbol("emit"));
-  assert(emit_v->IsFunction());
-  Local<Function> emit_f = Local<Function>::Cast(emit_v);
-  emit = Persistent<Function>::New(emit_f);
+void Stdio::Initialize(v8::Handle<v8::Object> target) {
+  HandleScope scope;
 
   if (isatty(STDOUT_FILENO)) {
     // XXX selecting on tty fds wont work in windows.
@@ -262,10 +112,13 @@ Stdio::Initialize (v8::Handle<v8::Object> target)
     stdout_coupling = coupling_new_push(STDOUT_FILENO);
     stdout_fd = coupling_nonblocking_fd(stdout_coupling);
   }
-  set_nonblock(stdout_fd);
+  SetNonblock(stdout_fd);
+
+  target->Set(String::NewSymbol("stdoutFD"), Integer::New(stdout_fd));
 
-  evcom_writer_init(&out);
-  out.on_close = detach_out;
-  evcom_writer_set(&out, stdout_fd);
-  evcom_writer_attach(EV_DEFAULT_ &out);
+  NODE_SET_METHOD(target, "writeError", WriteError);
+  NODE_SET_METHOD(target, "openStdin", OpenStdin);
 }
+
+
+}  // namespace node
index 60fa291..4763943 100644 (file)
@@ -2,9 +2,7 @@
 #define node_stdio_h
 
 #include <node.h>
-
 #include <v8.h>
-#include <evcom.h>
 
 namespace node {
 
@@ -14,5 +12,5 @@ public:
   static void Flush ();
 };
 
-} // namespace node
-#endif
+}  // namespace node
+#endif  // node_stdio_h
index 94b9836..ad6410c 100644 (file)
@@ -1,4 +1,5 @@
-process.mixin(require("../common"));
+require("../common");
+
 net = require("net");
 
 process.Buffer.prototype.toString = function () {