return r;
}
+/* PULL PUMP
+ *
+ * This is used to read data from a blocking file descriptor and pump it into
+ * a non-blocking pipe (or other non-blocking fd). The algorithm is this:
+ *
+ * while (true) {
+ * read(STDIN_FILENO) // blocking
+ *
+ * while (!ring.empty) {
+ * write(pipe) // non-blocking
+ * select(pipe, writable)
+ * }
+ * }
+ *
+ */
static void
-pump (int is_pull, int pullfd, int pushfd)
+pull_pump (int pullfd, int pushfd)
{
int r;
ring_buffer ring;
- fd_set readfds, writefds, exceptfds;
+
+ fd_set writefds, exceptfds;
+ FD_ZERO(&exceptfds);
+ FD_ZERO(&writefds);
+ FD_SET(pushfd, &exceptfds);
+ FD_SET(pushfd, &writefds);
ring_buffer_init(&ring);
- int maxfd;
+ while (pullfd >= 0) {
+ /* Blocking read from STDIN_FILENO */
+ r = ring_buffer_pull(&ring, pullfd);
- while (pushfd >= 0 && (pullfd >= 0 || !ring_buffer_empty_p(&ring))) {
- FD_ZERO(&exceptfds);
- FD_ZERO(&readfds);
- FD_ZERO(&writefds);
-
- maxfd = -1;
-
- if (is_pull) {
- if (!ring_buffer_empty_p(&ring)) {
- maxfd = pushfd;
- FD_SET(pushfd, &exceptfds);
- FD_SET(pushfd, &writefds);
- }
- } else {
- if (pullfd >= 0) {
- if (!ring_buffer_filled_p(&ring)) {
- maxfd = pullfd;
- FD_SET(pullfd, &exceptfds);
- FD_SET(pullfd, &readfds);
+ if (r == 0) {
+ /* eof */
+ close(pullfd);
+ pullfd = -1;
+ } else if (r < 0 && errno != EINTR && errno != EAGAIN) {
+ /* error */
+ perror("pull_pump read()");
+ close(pullfd);
+ pullfd = -1;
+ }
+
+ /* Push all of the data in the ring buffer out. */
+ while (!ring_buffer_empty_p(&ring)) {
+ /* non-blocking write() to the pipe */
+ r = ring_buffer_push(&ring, pushfd);
+
+ if (r < 0 && errno != EAGAIN && errno != EINTR) {
+ if (errno == EPIPE) {
+ /* This happens if someone closes the other end of the pipe. This
+ * is a normal forced close of STDIN. Hopefully there wasn't data
+ * in the ring buffer. Just close both ends and exit.
+ */
+ close(pushfd);
+ close(pullfd);
+ pushfd = pullfd = -1;
+ } else {
+ perror("pull_pump write()");
+ close(pushfd);
+ close(pullfd);
}
+ return;
}
- }
- if (maxfd >= 0) {
- r = select(maxfd+1, &readfds, &writefds, &exceptfds, NULL);
+ /* Select for writablity on the pipe end.
+ * Very rarely will this stick.
+ */
+ r = select(pushfd+1, NULL, &writefds, &exceptfds, NULL);
- if (r < 0 || (pullfd >= 0 && FD_ISSET(pushfd, &exceptfds))) {
+ if (r < 0 || FD_ISSET(pushfd, &exceptfds)) {
close(pushfd);
close(pullfd);
pushfd = pullfd = -1;
return;
}
}
+ }
+ assert(pullfd < 0);
+ assert(ring_buffer_empty_p(&ring));
+ close(pushfd);
+}
+
+/* PUSH PUMP
+ *
+ * This is used to push data out to a blocking file descriptor. It pulls
+ * data from a non-blocking pipe (pullfd) and pushes to STDOUT_FILENO
+ * (pushfd).
+ * When the pipe is closed, then the rest of the data is pushed out and then
+ * STDOUT_FILENO is closed.
+ *
+ * The algorithm looks roughly like this:
+ *
+ * while (true) {
+ * r = read(pipe) // nonblocking
+ *
+ * while (!ring.empty) {
+ * write(STDOUT_FILENO) // blocking
+ * }
+ *
+ * select(pipe, readable);
+ * }
+ */
+static void
+push_pump (int pullfd, int pushfd)
+{
+ int r;
+ ring_buffer ring;
+
+ fd_set readfds, exceptfds;
+ FD_ZERO(&exceptfds);
+ FD_ZERO(&readfds);
+ FD_SET(pullfd, &exceptfds);
+ FD_SET(pullfd, &readfds);
+
+ ring_buffer_init(&ring);
+
+ /* The pipe is open or there is data left to be pushed out
+ * NOTE: if pushfd (STDOUT_FILENO) ever errors out, then we just exit the
+ * loop.
+ */
+ while (pullfd >= 0 || !ring_buffer_empty_p(&ring)) {
- if (pullfd >= 0 && FD_ISSET(pullfd, &exceptfds)) {
+ /* Pull from the non-blocking pipe */
+ r = ring_buffer_pull(&ring, pullfd);
+
+ if (r == 0) {
+ /* eof */
+ close(pullfd);
+ pullfd = -1;
+ } else if (r < 0 && errno != EINTR && errno != EAGAIN) {
+ perror("push_pump read()");
close(pullfd);
pullfd = -1;
+ return;
}
- if (pullfd >= 0 && (is_pull || FD_ISSET(pullfd, &readfds))) {
- r = ring_buffer_pull(&ring, pullfd);
- if (r == 0) {
- /* eof */
- close(pullfd);
- pullfd = -1;
+ /* Push everything out to STDOUT */
+ while (!ring_buffer_empty_p(&ring)) {
+ /* Blocking write() to pushfd (STDOUT_FILENO) */
+ r = ring_buffer_push(&ring, pushfd);
+
+ /* If there was a problem, just exit the entire function */
- } else if (r < 0) {
- if (errno != EINTR && errno != EAGAIN) goto error;
+ if (r < 0 && errno != EINTR) {
+ close(pushfd);
+ close(pullfd);
+ pushfd = pullfd = -1;
+ return;
}
}
+
+ if (pullfd >= 0) {
+ /* select for readability on the pullfd */
+ r = select(pullfd+1, &readfds, NULL, &exceptfds, NULL);
- if (!is_pull || FD_ISSET(pushfd, &writefds)) {
- r = ring_buffer_push(&ring, pushfd);
- if (r < 0) {
- switch (errno) {
- case EINTR:
- case EAGAIN:
- continue;
-
- case EPIPE:
- /* TODO catch SIGPIPE? */
- close(pushfd);
- close(pullfd);
- pushfd = pullfd = -1;
- return;
-
- default:
- goto error;
- }
+ if (r < 0 || FD_ISSET(pullfd, &exceptfds)) {
+ close(pushfd);
+ close(pullfd);
+ pushfd = pullfd = -1;
+ return;
}
}
}
-
+ /* If we got here then we got eof on pullfd and pushed all the data out.
+ * so now just close pushfd */
+ assert(pullfd < 0);
+ assert(ring_buffer_empty_p(&ring));
close(pushfd);
- close(pullfd);
- return;
-
-error:
- close(pushfd);
- close(pullfd);
- perror("(coupling) pump");
}
static inline int
{
struct coupling *c = (struct coupling*)data;
- pump(c->is_pull, c->pullfd, c->pushfd);
+ if (c->is_pull) {
+ pull_pump(c->pullfd, c->pushfd);
+ } else {
+ push_pump(c->pullfd, c->pushfd);
+ }
return NULL;
}