Bugfix: blocked pumping in stdio coupling
authorRyan Dahl <ry@tinyclouds.org>
Sat, 6 Feb 2010 01:14:14 +0000 (17:14 -0800)
committerRyan Dahl <ry@tinyclouds.org>
Sat, 6 Feb 2010 01:14:14 +0000 (17:14 -0800)
This should fix the test in c05b5d8 by Mikeal Rogers.

deps/coupling/coupling.c
test/mjsunit/fixtures/echo.js
test/mjsunit/test-stdio.js

index 57eca1c..f6b03f3 100644 (file)
@@ -143,98 +143,176 @@ ring_buffer_push (ring_buffer *ring, int fd)
   return r;
 }
 
+/* PULL PUMP
+ *
+ * This is used to read data from a blocking file descriptor and pump it into
+ * a non-blocking pipe (or other non-blocking fd). The algorithm is this:
+ *
+ *   while (true) {
+ *     read(STDIN_FILENO) // blocking
+ *
+ *     while (!ring.empty) {
+ *       write(pipe) // non-blocking
+ *       select(pipe, writable) 
+ *     }
+ *   }
+ *
+ */
 static void
-pump (int is_pull, int pullfd, int pushfd)
+pull_pump (int pullfd, int pushfd)
 {
   int r;
   ring_buffer ring;
-  fd_set readfds, writefds, exceptfds;
+
+  fd_set writefds, exceptfds;
+  FD_ZERO(&exceptfds);
+  FD_ZERO(&writefds);
+  FD_SET(pushfd, &exceptfds);
+  FD_SET(pushfd, &writefds);
 
   ring_buffer_init(&ring);
 
-  int maxfd;
+  while (pullfd >= 0) {
+    /* Blocking read from STDIN_FILENO */
+    r = ring_buffer_pull(&ring, pullfd);
 
-  while (pushfd >= 0 && (pullfd >= 0 || !ring_buffer_empty_p(&ring))) {
-    FD_ZERO(&exceptfds);
-    FD_ZERO(&readfds);
-    FD_ZERO(&writefds);
-    
-    maxfd = -1;
-    
-    if (is_pull) {
-      if (!ring_buffer_empty_p(&ring)) { 
-        maxfd = pushfd;
-        FD_SET(pushfd, &exceptfds);
-        FD_SET(pushfd, &writefds);
-      }
-    } else {
-      if (pullfd >= 0) {
-        if (!ring_buffer_filled_p(&ring)) {
-          maxfd = pullfd;
-          FD_SET(pullfd, &exceptfds);
-          FD_SET(pullfd, &readfds);
+    if (r == 0) {
+      /* eof */
+      close(pullfd);
+      pullfd = -1;
+    } else if (r < 0 && errno != EINTR && errno != EAGAIN) {
+      /* error */
+      perror("pull_pump read()");
+      close(pullfd);
+      pullfd = -1;
+    }
+
+    /* Push all of the data in the ring buffer out. */
+    while (!ring_buffer_empty_p(&ring)) {
+      /* non-blocking write() to the pipe */
+      r = ring_buffer_push(&ring, pushfd);
+
+      if (r < 0 && errno != EAGAIN && errno != EINTR) {
+        if (errno == EPIPE) {
+          /* This happens if someone closes the other end of the pipe.  This
+           * is a normal forced close of STDIN. Hopefully there wasn't data
+           * in the ring buffer. Just close both ends and exit.
+           */
+          close(pushfd);
+          close(pullfd);
+          pushfd = pullfd = -1;
+        } else {
+          perror("pull_pump write()");
+          close(pushfd);
+          close(pullfd);
         }
+        return;
       }
-    }
 
-    if (maxfd >= 0) {
-      r = select(maxfd+1, &readfds, &writefds, &exceptfds, NULL);
+      /* Select for writablity on the pipe end. 
+       * Very rarely will this stick.
+       */
+      r = select(pushfd+1, NULL, &writefds, &exceptfds, NULL);
 
-      if (r < 0 || (pullfd >= 0 && FD_ISSET(pushfd, &exceptfds))) {
+      if (r < 0 || FD_ISSET(pushfd, &exceptfds)) {
         close(pushfd);
         close(pullfd);
         pushfd = pullfd = -1;
         return;
       }
     }
+  }
+  assert(pullfd < 0);
+  assert(ring_buffer_empty_p(&ring));
+  close(pushfd);
+}
+
+/* PUSH PUMP
+ *
+ * This is used to push data out to a blocking file descriptor. It pulls
+ * data from a non-blocking pipe (pullfd) and pushes to STDOUT_FILENO
+ * (pushfd).
+ * When the pipe is closed, then the rest of the data is pushed out and then
+ * STDOUT_FILENO is closed.
+ *
+ * The algorithm looks roughly like this:
+ *
+ *  while (true) {
+ *    r = read(pipe) // nonblocking
+ *
+ *    while (!ring.empty) {
+ *      write(STDOUT_FILENO) // blocking
+ *    }
+ *
+ *    select(pipe, readable);
+ *  }
+ */
+static void
+push_pump (int pullfd, int pushfd)
+{
+  int r;
+  ring_buffer ring;
+
+  fd_set readfds, exceptfds;
+  FD_ZERO(&exceptfds);
+  FD_ZERO(&readfds);
+  FD_SET(pullfd, &exceptfds);
+  FD_SET(pullfd, &readfds);
+
+  ring_buffer_init(&ring);
+
+  /* The pipe is open or there is data left to be pushed out 
+   * NOTE: if pushfd (STDOUT_FILENO) ever errors out, then we just exit the
+   * loop.
+   */
+  while (pullfd >= 0 || !ring_buffer_empty_p(&ring)) {
 
-    if (pullfd >= 0 && FD_ISSET(pullfd, &exceptfds)) {
+    /* Pull from the non-blocking pipe */
+    r = ring_buffer_pull(&ring, pullfd);
+
+    if (r == 0) {
+      /* eof */
+      close(pullfd);
+      pullfd = -1;
+    } else if (r < 0 && errno != EINTR && errno != EAGAIN) {
+      perror("push_pump read()");
       close(pullfd);
       pullfd = -1;
+      return;
     }
 
-    if (pullfd >= 0 && (is_pull || FD_ISSET(pullfd, &readfds))) {
-      r = ring_buffer_pull(&ring, pullfd);
-      if (r == 0) {
-        /* eof */
-        close(pullfd);
-        pullfd = -1;
+    /* Push everything out to STDOUT */
+    while (!ring_buffer_empty_p(&ring)) {
+      /* Blocking write() to pushfd (STDOUT_FILENO) */
+      r = ring_buffer_push(&ring, pushfd);
+
+      /* If there was a problem, just exit the entire function */
 
-      } else if (r < 0) {
-        if (errno != EINTR && errno != EAGAIN) goto error;
+      if (r < 0 && errno != EINTR) {
+        close(pushfd);
+        close(pullfd);
+        pushfd = pullfd = -1;
+        return;
       }
     }
+    
+    if (pullfd >= 0) {
+      /* select for readability on the pullfd */
+      r = select(pullfd+1, &readfds, NULL, &exceptfds, NULL);
 
-    if (!is_pull || FD_ISSET(pushfd, &writefds)) {
-      r = ring_buffer_push(&ring, pushfd);
-      if (r < 0) {
-        switch (errno) {
-          case EINTR:
-          case EAGAIN:
-            continue;
-
-          case EPIPE:
-            /* TODO catch SIGPIPE? */
-            close(pushfd);
-            close(pullfd);
-            pushfd = pullfd = -1;
-            return;
-
-          default:
-            goto error;
-        }
+      if (r < 0 || FD_ISSET(pullfd, &exceptfds)) {
+        close(pushfd);
+        close(pullfd);
+        pushfd = pullfd = -1;
+        return;
       }
     }
   }
-
+  /* If we got here then we got eof on pullfd and pushed all the data out.
+   * so now just close pushfd */
+  assert(pullfd < 0);
+  assert(ring_buffer_empty_p(&ring));
   close(pushfd);
-  close(pullfd);
-  return;
-
-error:
-  close(pushfd);
-  close(pullfd);
-  perror("(coupling) pump");
 }
 
 static inline int
@@ -262,7 +340,11 @@ pump_thread (void *data)
 {
   struct coupling *c = (struct coupling*)data;
 
-  pump(c->is_pull, c->pullfd, c->pushfd);
+  if (c->is_pull) {
+    pull_pump(c->pullfd, c->pushfd);
+  }  else {
+    push_pump(c->pullfd, c->pushfd);
+  }
 
   return NULL;
 }
index 49f435c..0aed0ff 100644 (file)
@@ -1,5 +1,12 @@
 process.mixin(require("../common"));
 process.stdio.open();
+
+print("hello world\r\n");
+
 process.stdio.addListener("data", function (data) {
-  puts(data);
-});
\ No newline at end of file
+  print(data);
+});
+
+process.stdio.addListener("close", function () {
+  process.stdio.close();
+});
index 9570539..e8bc684 100644 (file)
@@ -2,20 +2,34 @@ process.mixin(require("./common"));
 
 var sub = path.join(fixturesDir, 'echo.js');
 
-var result = false;
-var child = process.createChildProcess(path.join(libDir, "../bin/node"), [sub]);
+var gotHelloWorld = false;
+var gotEcho = false;
+
+var child = process.createChildProcess(process.argv[0], [sub]);
+
 child.addListener("error", function (data){
   puts("parent stderr: " + data);
 });
+
 child.addListener("output", function (data){
-  if (data && data[0] == 't') {
-    result = true;
+  if (data) {
+    puts('child said: ' + JSON.stringify(data));
+    if (!gotHelloWorld) {
+      assert.equal("hello world\r\n", data);
+      gotHelloWorld = true;
+      child.write('echo me\r\n');
+    } else {
+      assert.equal("echo me\r\n", data);
+      gotEcho = true;
+      child.close();
+    }
+  } else {
+    puts('child eof');
   }
 });
-setTimeout(function () {
-  child.write('t\r\n');
-}, 100);
-setTimeout(function (){
-  assert.ok(result);
-}, 500)
+
+
+process.addListener('exit', function () {
+  assert.ok(gotHelloWorld);
+  assert.ok(gotEcho);
+});