bugfix: Properly exit a process.
authorRyan <ry@tinyclouds.org>
Wed, 24 Jun 2009 11:44:12 +0000 (13:44 +0200)
committerRyan <ry@tinyclouds.org>
Wed, 24 Jun 2009 11:44:12 +0000 (13:44 +0200)
This requires that onExit() is not called immediately upon receiving a
SIGCHLD. There could still be data in the pipez. So, instead just set a
flag and invoke the pipe watchers.

Sometimes one will not receive an EOF from pipes because the process was
killed by a SIGTERM, or something. If SIGCHLD has been recved but we are
getting EAGAIN, the pipez need to be closed too.

src/process.cc
src/process.h
test/mjsunit/test-process-buffering.js [new file with mode: 0644]

index d43c550..65e2c2f 100644 (file)
@@ -16,6 +16,12 @@ using namespace node;
 #define ON_EXIT_SYMBOL    String::NewSymbol("onExit")
 #define PID_SYMBOL        String::NewSymbol("pid")
 
+/* defines for the parent side */
+#define STDOUT_CLOSED     (stdout_pipe_[0] < 0)
+#define STDERR_CLOSED     (stderr_pipe_[0] < 0)
+#define STDIN_CLOSED      (stdin_pipe_[1] < 0)
+
+
 Persistent<FunctionTemplate> Process::constructor_template;
 
 void
@@ -119,11 +125,7 @@ Process::Write (const Arguments& args)
 
   } else return ThrowException(String::New("Bad argument"));
 
-  if (process->Write(buf) != 0) {
-    return ThrowException(String::New("Pipe already closed"));
-  }
-
-  return Undefined();  
+  return process->Write(buf) == 0 ? True() : False();
 }
 
 Handle<Value>
@@ -149,12 +151,7 @@ Process::Close (const Arguments& args)
   HandleScope scope;
   Process *process = NODE_UNWRAP(Process, args.Holder());
   assert(process);
-
-  if (process->Close() != 0) {
-    return ThrowException(String::New("Pipe already closed."));
-  }
-
-  return Undefined();  
+  return process->Close() == 0 ? True() : False();
 }
 
 Process::Process (Handle<Object> handle)
@@ -169,7 +166,7 @@ Process::Process (Handle<Object> handle)
   ev_init(&stdin_watcher_, Process::OnWritable);
   stdin_watcher_.data = this;
 
-  ev_init(&child_watcher_, Process::OnExit);
+  ev_init(&child_watcher_, Process::OnCHLD);
   child_watcher_.data = this;
 
   stdout_pipe_[0] = -1;
@@ -180,6 +177,8 @@ Process::Process (Handle<Object> handle)
   stdin_pipe_[1] = -1;
 
   got_close_ = false;
+  got_chld_ = false;
+  exit_code_ = 0;
 
   pid_ = 0; 
 
@@ -342,7 +341,18 @@ Process::OnOutput (EV_P_ ev_io *watcher, int revents)
     r = read(fd, buf, buf_size);
 
     if (r < 0) {
-      if (errno != EAGAIN) perror("IPC pipe read error");
+      if (errno != EAGAIN) {
+        perror("IPC pipe read error");
+      } else {
+        if (process->got_chld_) {
+          close(fd);
+          if (is_stdout) {
+            process->stdout_pipe_[0] = -1;
+          } else {
+            process->stderr_pipe_[0] = -1;
+          }
+        }
+      }
       break;
     }
 
@@ -364,9 +374,16 @@ Process::OnOutput (EV_P_ ev_io *watcher, int revents)
 
     if (r == 0) {
       ev_io_stop(EV_DEFAULT_UC_ watcher);
+      close(fd);
+      if (is_stdout) {
+        process->stdout_pipe_[0] = -1;
+      } else {
+        process->stderr_pipe_[0] = -1;
+      }
       break;
     }
   }
+  process->MaybeShutdown();
 }
 
 void 
@@ -387,7 +404,13 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents)
                 , to_write->len - to_write->written
                 );
     if (sent < 0) {
-      if (errno == EAGAIN) break;
+      if (errno == EAGAIN) {
+        if (process->got_chld_) {
+          close(process->stdin_pipe_[1]);
+          process->stdin_pipe_[1] = -1;
+        }
+        break;
+      }
       perror("IPC pipe write error");
       break;
     }
@@ -410,7 +433,7 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents)
 }
 
 void 
-Process::OnExit (EV_P_ ev_child *watcher, int revents)
+Process::OnCHLD (EV_P_ ev_child *watcher, int revents)
 {
   ev_child_stop(EV_A_ watcher);
   Process *process = static_cast<Process*>(watcher->data);
@@ -419,24 +442,29 @@ Process::OnExit (EV_P_ ev_child *watcher, int revents)
   assert(process->pid_ == watcher->rpid);
   assert(&process->child_watcher_ == watcher);
 
-  // Call onExit ( watcher->rstatus )
-  HandleScope scope;
-  Handle<Value> callback_v = process->handle_->Get(ON_EXIT_SYMBOL);
+  process->got_chld_ = true;
+  process->exit_code_ = watcher->rstatus;
 
-  if (callback_v->IsFunction()) {
-    Handle<Function> callback = Handle<Function>::Cast(callback_v);
-    TryCatch try_catch;
-    Handle<Value> argv[1] = { Integer::New(watcher->rstatus) };
-    callback->Call(process->handle_, 1, argv);
-    if (try_catch.HasCaught()) FatalException(try_catch);
+  if (process->stdout_pipe_[0] >= 0) {
+    ev_feed_event(&process->stdout_watcher_, EV_READ);
+  }
+
+  if (process->stderr_pipe_[0] >= 0) {
+    ev_feed_event(&process->stderr_watcher_, EV_READ);
   }
-  process->Shutdown();
+
+  if (process->stdin_pipe_[1] >= 0) {
+    ev_io_start(EV_DEFAULT_UC_ &process->stdin_watcher_);
+    ev_feed_event(&process->stdin_watcher_, EV_WRITE);
+  }
+
+  process->MaybeShutdown();
 }
 
 int
 Process::Write (oi_buf *buf)
 {
-  if (stdin_pipe_[1] < 0 || got_close_) return -1;
+  if (STDIN_CLOSED || got_close_ || got_chld_) return -1;
   oi_queue_insert_head(&out_stream_, &buf->queue);
   buf->written = 0;
   ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
@@ -444,9 +472,9 @@ Process::Write (oi_buf *buf)
 }
 
 int
-Process::Close ()
+Process::Close (void)
 {
-  if (stdin_pipe_[1] < 0 || got_close_) return -1;
+  if (STDIN_CLOSED || got_close_ || got_chld_) return -1;
   got_close_ = true;
   ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
   return 0;
@@ -455,6 +483,26 @@ Process::Close ()
 int
 Process::Kill (int sig)
 {
-  if (pid_ == 0) return -1;
+  if (got_chld_ || pid_ == 0) return -1;
   return kill(pid_, sig);
 }
+
+int
+Process::MaybeShutdown (void)
+{
+  if (STDOUT_CLOSED && STDERR_CLOSED && got_chld_) {
+    // Call onExit
+    HandleScope scope;
+    Handle<Value> callback_v = handle_->Get(ON_EXIT_SYMBOL);
+
+    if (callback_v->IsFunction()) {
+      Handle<Function> callback = Handle<Function>::Cast(callback_v);
+      TryCatch try_catch;
+      Handle<Value> argv[1] = { Integer::New(exit_code_) };
+      callback->Call(handle_, 1, argv);
+      if (try_catch.HasCaught()) FatalException(try_catch);
+    }
+
+    Shutdown();
+  } 
+}
index 38138b8..55891be 100644 (file)
@@ -24,17 +24,19 @@ class Process : ObjectWrap {
   Process(v8::Handle<v8::Object> handle);
   ~Process();
 
-  void Shutdown ();
   int Spawn (const char *command);
   int Write (oi_buf *buf);
-  int Close ();
+  int Close (void);
   int Kill (int sig);
 
  private:
   static void OnOutput (EV_P_ ev_io *watcher, int revents);
   static void OnError (EV_P_ ev_io *watcher, int revents);
   static void OnWritable (EV_P_ ev_io *watcher, int revents);
-  static void OnExit (EV_P_ ev_child *watcher, int revents);
+  static void OnCHLD (EV_P_ ev_child *watcher, int revents);
+
+  int MaybeShutdown (void);
+  void Shutdown (void);
 
   ev_io stdout_watcher_;
   ev_io stderr_watcher_;
@@ -48,6 +50,8 @@ class Process : ObjectWrap {
   pid_t pid_;
 
   bool got_close_;
+  bool got_chld_;
+  int exit_code_;
 
   oi_queue out_stream_;
 };
diff --git a/test/mjsunit/test-process-buffering.js b/test/mjsunit/test-process-buffering.js
new file mode 100644 (file)
index 0000000..74dfca5
--- /dev/null
@@ -0,0 +1,29 @@
+include("mjsunit.js");
+
+var pwd_called = false;
+
+function pwd (callback) {
+  var output = "";
+  var process = new node.Process("pwd");
+  process.onOutput = function (s) {
+    if (s) output += s;
+  };
+  process.onExit = function(c) {
+    assertEquals(0, c);
+    callback(output);
+    pwd_called = true;
+  };
+}
+
+
+function onLoad () {
+  pwd(function (result) {
+    print(result);  
+    assertTrue(result.length > 1);
+    assertEquals("\n", result[result.length-1]);
+  });
+}
+
+function onExit () {
+  assertTrue(pwd_called);
+}