1 /* Copyright (C) 1989, 1990, 1991, 1992, 2000, 2001, 2002, 2003
2 * Free Software Foundation, Inc.
3 * Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008 Colin Watson.
4 * Written for groff by James Clark (jjc@jclark.com)
5 * Heavily adapted and extended for man-db by Colin Watson.
7 * This file is part of libpipeline.
9 * libpipeline is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or (at
12 * your option) any later version.
14 * libpipeline is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with libpipeline; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
33 #include <sys/select.h>
35 #include <sys/types.h>
44 #include "full-write.h"
45 #include "safe-read.h"
46 #include "safe-write.h"
49 #include "xvasprintf.h"
51 #include "pipeline-private.h"
55 #ifdef USE_SOCKETPAIR_PIPE
57 # include <netinet/in.h>
58 # include <sys/socket.h>
59 # ifdef CORRECT_SOCKETPAIR_MODE
60 # include <sys/stat.h>
68 # ifdef CORRECT_SOCKETPAIR_MODE
69 # define pipe(p) (((socketpair(AF_UNIX,SOCK_STREAM,0,p) < 0) || \
70 (shutdown((p)[1],SHUT_RD) < 0) || (fchmod((p)[1],S_IWUSR) < 0) || \
71 (shutdown((p)[0],SHUT_WR) < 0) || (fchmod((p)[0],S_IRUSR) < 0)) ? -1 : 0)
73 # define pipe(p) (((socketpair(AF_UNIX,SOCK_STREAM,0,p) < 0) || \
74 (shutdown((p)[1],SHUT_RD) < 0) || (shutdown((p)[0],SHUT_WR) < 0)) ? -1 : 0)
78 /* ---------------------------------------------------------------------- */
80 /* Functions to build individual commands. */
82 pipecmd *pipecmd_new (const char *name)
84 pipecmd *cmd = XMALLOC (pipecmd);
85 struct pipecmd_process *cmdp;
88 cmd->tag = PIPECMD_PROCESS;
89 cmd->name = xstrdup (name);
95 cmd->env = xnmalloc (cmd->env_max, sizeof *cmd->env);
97 cmdp = &cmd->u.process;
101 cmdp->argv = xnmalloc (cmdp->argv_max, sizeof *cmdp->argv);
103 /* argv[0] is the basename of the command name. */
104 name_base = base_name (name);
105 pipecmd_arg (cmd, name_base);
111 pipecmd *pipecmd_new_argv (const char *name, va_list argv)
113 pipecmd *cmd = pipecmd_new (name);
114 pipecmd_argv (cmd, argv);
118 pipecmd *pipecmd_new_args (const char *name, ...)
123 va_start (argv, name);
124 cmd = pipecmd_new_argv (name, argv);
130 /* As suggested in the header file, this function (for pipecmd_new_argstr()
131 * and pipecmd_argstr()) is really a wart. If we didn't have to worry about
132 * old configuration files then it wouldn't be necessary. Worse, the
133 * definition for tr in man_db.conf currently contains single-quoting, and
134 * people probably took that as a licence to do similar things, so we're
135 * obliged to worry about quoting as well!
137 * However, we can mitigate this; shell quoting alone is safe though
138 * sometimes confusing, but it's other shell constructs that tend to cause
139 * real security holes. Therefore, rather than punting to 'sh -c' or
140 * whatever, we parse a safe subset manually. Environment variables are not
141 * currently handled because of tricky word splitting issues, but in
142 * principle they could be if there's demand for it.
144 * TODO: Support setting environment variables.
146 static char *argstr_get_word (const char **argstr)
149 const char *litstart = *argstr;
150 enum { NONE, SINGLE, DOUBLE } quotemode = NONE;
155 /* If it's just a literal character, go round again. */
156 if ((quotemode == NONE && !strchr (" \t'\"\\", **argstr)) ||
157 /* nothing is special in '; terminated by ' */
158 (quotemode == SINGLE && **argstr != '\'') ||
159 /* \ is special in "; terminated by " */
160 (quotemode == DOUBLE && !strchr ("\"\\", **argstr))) {
165 /* Within "", \ is only special when followed by $, `, ", or
166 * \ (or <newline> in a real shell, but we don't do that).
168 if (quotemode == DOUBLE && **argstr == '\\' &&
169 !strchr ("$`\"\\", *(*argstr + 1))) {
174 /* Copy any accumulated literal characters. */
175 if (litstart < *argstr) {
176 char *tmp = xstrndup (litstart, *argstr - litstart);
177 out = appendstr (out, tmp, NULL);
184 /* End of word; skip over extra whitespace. */
186 if (!strchr (" \t", **argstr))
191 if (quotemode != NONE)
195 litstart = ++*argstr;
199 if (quotemode != NONE)
203 litstart = ++*argstr;
207 backslashed[0] = *++*argstr;
208 if (!backslashed[0]) {
209 /* Unterminated quoting; give up. */
214 backslashed[1] = '\0';
215 out = appendstr (out, backslashed, NULL);
216 litstart = ++*argstr;
220 assert (!"unexpected state parsing argstr");
224 if (quotemode != NONE) {
225 /* Unterminated quoting; give up. */
231 /* Copy any accumulated literal characters. */
232 if (litstart < *argstr) {
233 char *tmp = xstrndup (litstart, *argstr - litstart);
234 out = appendstr (out, tmp, NULL);
241 pipecmd *pipecmd_new_argstr (const char *argstr)
246 arg = argstr_get_word (&argstr);
249 "badly formed configuration directive: '%s'",
251 if (!strcmp (arg, "exec")) {
252 /* Some old configuration files have "exec command" rather
253 * than "command"; this worked fine when being evaluated by
254 * a shell, but since exec is a shell builtin it doesn't
255 * work when being executed directly. To work around this,
256 * we just drop "exec" if it appears at the start of argstr.
258 arg = argstr_get_word (&argstr);
261 "badly formed configuration directive: '%s'",
264 cmd = pipecmd_new (arg);
267 while ((arg = argstr_get_word (&argstr))) {
268 pipecmd_arg (cmd, arg);
275 pipecmd *pipecmd_new_function (const char *name,
276 pipecmd_function_type *func,
277 pipecmd_function_free_type *free_func,
280 pipecmd *cmd = XMALLOC (pipecmd);
281 struct pipecmd_function *cmdf;
283 cmd->tag = PIPECMD_FUNCTION;
284 cmd->name = xstrdup (name);
286 cmd->discard_err = 0;
290 cmd->env = xnmalloc (cmd->env_max, sizeof *cmd->env);
292 cmdf = &cmd->u.function;
295 cmdf->free_func = free_func;
301 pipecmd *pipecmd_new_sequencev (const char *name, va_list cmdv)
303 pipecmd *cmd = XMALLOC (pipecmd);
304 struct pipecmd_sequence *cmds;
307 cmd->tag = PIPECMD_SEQUENCE;
308 cmd->name = xstrdup (name);
310 cmd->discard_err = 0;
314 cmd->env = xnmalloc (cmd->env_max, sizeof *cmd->env);
316 cmds = &cmd->u.sequence;
319 cmds->commands_max = 4;
320 cmds->commands = xnmalloc (cmds->commands_max, sizeof *cmds->commands);
322 child = va_arg (cmdv, pipecmd *);
324 pipecmd_sequence_command (cmd, child);
325 child = va_arg (cmdv, pipecmd *);
331 pipecmd *pipecmd_new_sequence (const char *name, ...)
336 va_start (cmdv, name);
337 cmd = pipecmd_new_sequencev (name, cmdv);
343 static void passthrough (void *data PIPELINE_ATTR_UNUSED)
347 int r = safe_read (STDIN_FILENO, buffer, 4096);
350 if (full_write (STDOUT_FILENO, buffer,
351 (size_t) r) < (size_t) r)
358 pipecmd *pipecmd_new_passthrough (void)
360 return pipecmd_new_function ("cat", &passthrough, NULL, NULL);
363 pipecmd *pipecmd_dup (pipecmd *cmd)
365 pipecmd *newcmd = XMALLOC (pipecmd);
368 newcmd->tag = cmd->tag;
369 newcmd->name = xstrdup (cmd->name);
370 newcmd->nice = cmd->nice;
371 newcmd->discard_err = cmd->discard_err;
373 newcmd->nenv = cmd->nenv;
374 newcmd->env_max = cmd->env_max;
375 assert (newcmd->nenv <= newcmd->env_max);
376 newcmd->env = xmalloc (newcmd->env_max * sizeof *newcmd->env);
378 for (i = 0; i < cmd->nenv; ++i) {
379 newcmd->env[i].name =
380 cmd->env[i].name ? xstrdup (cmd->env[i].name) : NULL;
381 newcmd->env[i].value =
382 cmd->env[i].value ? xstrdup (cmd->env[i].value) : NULL;
385 switch (newcmd->tag) {
386 case PIPECMD_PROCESS: {
387 struct pipecmd_process *cmdp = &cmd->u.process;
388 struct pipecmd_process *newcmdp = &newcmd->u.process;
390 newcmdp->argc = cmdp->argc;
391 newcmdp->argv_max = cmdp->argv_max;
392 assert (newcmdp->argc < newcmdp->argv_max);
393 newcmdp->argv = xmalloc
394 (newcmdp->argv_max * sizeof *newcmdp->argv);
396 for (i = 0; i < cmdp->argc; ++i)
397 newcmdp->argv[i] = xstrdup (cmdp->argv[i]);
398 newcmdp->argv[cmdp->argc] = NULL;
403 case PIPECMD_FUNCTION: {
404 struct pipecmd_function *cmdf = &cmd->u.function;
405 struct pipecmd_function *newcmdf = &newcmd->u.function;
407 newcmdf->func = cmdf->func;
408 newcmdf->free_func = cmdf->free_func;
409 newcmdf->data = cmdf->data;
414 case PIPECMD_SEQUENCE: {
415 struct pipecmd_sequence *cmds = &cmd->u.sequence;
416 struct pipecmd_sequence *newcmds = &newcmd->u.sequence;
418 newcmds->ncommands = cmds->ncommands;
419 newcmds->commands_max = cmds->commands_max;
420 assert (newcmds->ncommands <= newcmds->commands_max);
421 newcmds->commands = xmalloc
422 (newcmds->commands_max *
423 sizeof *newcmds->commands);
425 for (i = 0; i < cmds->ncommands; ++i)
426 newcmds->commands[i] =
427 pipecmd_dup (cmds->commands[i]);
436 void pipecmd_arg (pipecmd *cmd, const char *arg)
438 struct pipecmd_process *cmdp;
440 assert (cmd->tag == PIPECMD_PROCESS);
441 cmdp = &cmd->u.process;
443 if (cmdp->argc + 1 >= cmdp->argv_max) {
445 cmdp->argv = xrealloc (cmdp->argv,
446 cmdp->argv_max * sizeof *cmdp->argv);
449 cmdp->argv[cmdp->argc++] = xstrdup (arg);
450 assert (cmdp->argc < cmdp->argv_max);
451 cmdp->argv[cmdp->argc] = NULL;
454 void pipecmd_argf (pipecmd *cmd, const char *format, ...)
459 va_start (argv, format);
460 arg = xvasprintf (format, argv);
461 pipecmd_arg (cmd, arg);
466 void pipecmd_argv (pipecmd *cmd, va_list argv)
468 const char *arg = va_arg (argv, const char *);
470 assert (cmd->tag == PIPECMD_PROCESS);
473 pipecmd_arg (cmd, arg);
474 arg = va_arg (argv, const char *);
478 void pipecmd_args (pipecmd *cmd, ...)
482 assert (cmd->tag == PIPECMD_PROCESS);
484 va_start (argv, cmd);
485 pipecmd_argv (cmd, argv);
489 void pipecmd_argstr (pipecmd *cmd, const char *argstr)
493 assert (cmd->tag == PIPECMD_PROCESS);
495 while ((arg = argstr_get_word (&argstr))) {
496 pipecmd_arg (cmd, arg);
501 int pipecmd_get_nargs (pipecmd *cmd)
503 struct pipecmd_process *cmdp;
505 assert (cmd->tag == PIPECMD_PROCESS);
506 cmdp = &cmd->u.process;
511 void pipecmd_nice (pipecmd *cmd, int value)
516 void pipecmd_discard_err (pipecmd *cmd, int discard_err)
518 cmd->discard_err = discard_err;
521 void pipecmd_setenv (pipecmd *cmd, const char *name, const char *value)
523 if (cmd->nenv >= cmd->env_max) {
525 cmd->env = xrealloc (cmd->env,
526 cmd->env_max * sizeof *cmd->env);
529 cmd->env[cmd->nenv].name = xstrdup (name);
530 cmd->env[cmd->nenv].value = xstrdup (value);
534 void pipecmd_unsetenv (pipecmd *cmd, const char *name)
536 if (cmd->nenv >= cmd->env_max) {
538 cmd->env = xrealloc (cmd->env,
539 cmd->env_max * sizeof *cmd->env);
542 cmd->env[cmd->nenv].name = xstrdup (name);
543 cmd->env[cmd->nenv].value = NULL;
547 void pipecmd_clearenv (pipecmd *cmd)
549 if (cmd->nenv >= cmd->env_max) {
551 cmd->env = xrealloc (cmd->env,
552 cmd->env_max * sizeof *cmd->env);
555 cmd->env[cmd->nenv].name = NULL;
556 cmd->env[cmd->nenv].value = NULL;
560 void pipecmd_sequence_command (pipecmd *cmd, pipecmd *child)
562 struct pipecmd_sequence *cmds;
564 assert (cmd->tag == PIPECMD_SEQUENCE);
565 cmds = &cmd->u.sequence;
567 if (cmds->ncommands >= cmds->commands_max) {
568 cmds->commands_max *= 2;
569 cmds->commands = xrealloc
571 cmds->commands_max * sizeof *cmds->commands);
574 cmds->commands[cmds->ncommands++] = child;
577 void pipecmd_dump (pipecmd *cmd, FILE *stream)
581 for (i = 0; i < cmd->nenv; ++i) {
582 if (cmd->env[i].name)
583 fprintf (stream, "%s=%s ",
585 cmd->env[i].value ? cmd->env[i].value
588 fprintf (stream, "env -i ");
592 case PIPECMD_PROCESS: {
593 struct pipecmd_process *cmdp = &cmd->u.process;
595 fputs (cmd->name, stream);
596 for (i = 1; i < cmdp->argc; ++i) {
597 /* TODO: escape_shell()? */
599 fputs (cmdp->argv[i], stream);
605 case PIPECMD_FUNCTION:
606 fputs (cmd->name, stream);
609 case PIPECMD_SEQUENCE: {
610 struct pipecmd_sequence *cmds = &cmd->u.sequence;
613 for (i = 0; i < cmds->ncommands; ++i) {
614 pipecmd_dump (cmds->commands[i], stream);
615 if (i < cmds->ncommands - 1)
616 fputs (" && ", stream);
625 char *pipecmd_tostring (pipecmd *cmd)
630 for (i = 0; i < cmd->nenv; ++i) {
631 if (cmd->env[i].name)
632 out = appendstr (out, cmd->env[i].name, "=",
633 cmd->env[i].value ? cmd->env[i].value
637 out = appendstr (out, "env -i ", NULL);
641 case PIPECMD_PROCESS: {
642 struct pipecmd_process *cmdp = &cmd->u.process;
644 out = appendstr (out, cmd->name, NULL);
645 for (i = 1; i < cmdp->argc; ++i)
646 /* TODO: escape_shell()? */
647 out = appendstr (out, " ", cmdp->argv[i],
653 case PIPECMD_FUNCTION:
654 out = appendstr (out, cmd->name, NULL);
657 case PIPECMD_SEQUENCE: {
658 struct pipecmd_sequence *cmds = &cmd->u.sequence;
660 out = appendstr (out, "(", NULL);
661 for (i = 0; i < cmds->ncommands; ++i) {
662 char *subout = pipecmd_tostring
664 out = appendstr (out, subout, NULL);
666 if (i < cmds->ncommands - 1)
667 out = appendstr (out, " && ", NULL);
669 out = appendstr (out, ")", NULL);
678 /* Children exit with this status if execvp fails. */
679 #define EXEC_FAILED_EXIT_STATUS 0xff
681 /* When called internally during pipeline execution, this is called in the
682 * forked child process, with file descriptors already set up.
684 void pipecmd_exec (pipecmd *cmd)
689 if (nice (cmd->nice) < 0)
690 /* Don't worry too much. */
691 debug ("nice failed: %s\n", strerror (errno));
693 if (cmd->discard_err) {
694 int devnull = open ("/dev/null", O_WRONLY);
701 for (i = 0; i < cmd->nenv; ++i) {
702 if (cmd->env[i].name) {
703 if (cmd->env[i].value)
704 setenv (cmd->env[i].name,
705 cmd->env[i].value, 1);
707 unsetenv (cmd->env[i].name);
713 case PIPECMD_PROCESS: {
714 struct pipecmd_process *cmdp = &cmd->u.process;
715 execvp (cmd->name, cmdp->argv);
719 /* TODO: ideally, could there be a facility
720 * to execute non-blocking functions without
723 case PIPECMD_FUNCTION: {
724 struct pipecmd_function *cmdf = &cmd->u.function;
725 (*cmdf->func) (cmdf->data);
726 /* pacify valgrind et al */
728 (*cmdf->free_func) (cmdf->data);
732 case PIPECMD_SEQUENCE: {
733 struct pipecmd_sequence *cmds = &cmd->u.sequence;
736 /* pipeline_start will have blocked SIGCHLD. We like
737 * it that way. Lose the parent's signal handler,
740 memset (&sa, 0, sizeof sa);
741 sa.sa_handler = SIG_DFL;
742 sigemptyset (&sa.sa_mask);
744 if (sigaction (SIGCHLD, &sa, NULL) == -1)
746 "can't install SIGCHLD handler");
748 for (i = 0; i < cmds->ncommands; ++i) {
749 pipecmd *child = cmds->commands[i];
754 error (FATAL, errno, "fork failed");
756 pipecmd_exec (child);
757 debug ("Started \"%s\", pid %d\n",
760 while (waitpid (pid, &status, 0) < 0) {
763 error (FATAL, errno, "waitpid failed");
766 debug (" \"%s\" (%d) -> %d\n",
767 child->name, pid, status);
769 if (WIFSIGNALED (status)) {
770 int sig = WTERMSIG (status);
776 if (WCOREDUMP (status))
778 "%s: %s (core dumped)",
782 error (0, 0, "%s: %s",
785 } else if (!WIFEXITED (status))
786 error (0, 0, "unexpected status %d",
789 if (child->tag == PIPECMD_FUNCTION) {
790 struct pipecmd_function *cmdf =
797 if (WIFSIGNALED (status)) {
798 raise (WTERMSIG (status));
799 exit (1); /* just to make sure */
800 } else if (status && WIFEXITED (status))
801 exit (WEXITSTATUS (status));
808 error (EXEC_FAILED_EXIT_STATUS, errno, "can't execute %s", cmd->name);
809 /* Never called, but gcc doesn't realise that error with non-zero
810 * status always exits.
812 exit (EXEC_FAILED_EXIT_STATUS);
815 void pipecmd_free (pipecmd *cmd)
824 for (i = 0; i < cmd->nenv; ++i) {
825 free (cmd->env[i].name);
826 free (cmd->env[i].value);
831 case PIPECMD_PROCESS: {
832 struct pipecmd_process *cmdp = &cmd->u.process;
834 for (i = 0; i < cmdp->argc; ++i)
835 free (cmdp->argv[i]);
841 case PIPECMD_FUNCTION:
844 case PIPECMD_SEQUENCE: {
845 struct pipecmd_sequence *cmds = &cmd->u.sequence;
847 for (i = 0; i < cmds->ncommands; ++i)
848 pipecmd_free (cmds->commands[i]);
849 free (cmds->commands);
858 /* ---------------------------------------------------------------------- */
860 /* Functions to build pipelines. */
862 pipeline *pipeline_new (void)
864 pipeline *p = XMALLOC (pipeline);
867 p->commands = xnmalloc (p->commands_max, sizeof *p->commands);
870 p->redirect_in = p->redirect_out = REDIRECT_NONE;
871 p->want_in = p->want_out = 0;
872 p->want_infile = p->want_outfile = NULL;
873 p->infd = p->outfd = -1;
874 p->infile = p->outfile = NULL;
877 p->buflen = p->bufmax = 0;
878 p->line_cache = NULL;
880 p->ignore_signals = 0;
884 pipeline *pipeline_new_commandv (pipecmd *cmd1, va_list cmdv)
886 pipeline *p = pipeline_new ();
887 pipeline_command (p, cmd1);
888 pipeline_commandv (p, cmdv);
892 pipeline *pipeline_new_commands (pipecmd *cmd1, ...)
897 va_start (cmdv, cmd1);
898 p = pipeline_new_commandv (cmd1, cmdv);
904 pipeline *pipeline_new_command_argv (const char *name, va_list argv)
910 cmd = pipecmd_new_argv (name, argv);
911 pipeline_command (p, cmd);
916 pipeline *pipeline_new_command_args (const char *name, ...)
921 va_start (argv, name);
922 p = pipeline_new_command_argv (name, argv);
928 pipeline *pipeline_join (pipeline *p1, pipeline *p2)
930 pipeline *p = XMALLOC (pipeline);
935 assert (!p1->statuses);
936 assert (!p2->statuses);
938 p->ncommands = p1->ncommands + p2->ncommands;
939 p->commands_max = p1->ncommands + p2->ncommands;
940 p->commands = xnmalloc (p->commands_max, sizeof *p->commands);
943 p->redirect_in = p1->redirect_in;
944 p->want_in = p1->want_in;
945 p->want_infile = p1->want_infile;
946 p->redirect_out = p2->redirect_out;
947 p->want_out = p2->want_out;
948 p->want_outfile = p2->want_outfile;
950 p->outfd = p2->outfd;
951 p->infile = p1->infile;
952 p->outfile = p2->outfile;
955 p->buflen = p->bufmax = 0;
956 p->line_cache = NULL;
958 p->ignore_signals = (p1->ignore_signals || p2->ignore_signals);
960 for (i = 0; i < p1->ncommands; ++i)
961 p->commands[i] = pipecmd_dup (p1->commands[i]);
962 for (i = 0; i < p2->ncommands; ++i)
963 p->commands[p1->ncommands + i] = pipecmd_dup (p2->commands[i]);
968 void pipeline_connect (pipeline *source, pipeline *sink, ...)
973 /* We must be in control of output from the source pipeline. If the
974 * source isn't started, we can force this.
977 pipeline_want_out (source, -1);
978 assert (source->redirect_out == REDIRECT_FD);
979 assert (source->want_out < 0);
981 va_start (argv, sink);
982 for (arg = sink; arg; arg = va_arg (argv, pipeline *)) {
983 assert (!arg->pids); /* not started */
984 arg->source = source;
985 pipeline_want_in (arg, -1);
987 /* Zero-command sinks should represent data being passed
988 * straight through from the input to the output.
989 * Unfortunately pipeline_start and pipeline_pump don't
990 * handle this very well between them; a zero-command
991 * pipeline has the write end of its input pipe wrongly
992 * stashed in outfd and then pipeline_pump can't handle it
993 * because it has nowhere to send output. Until this is
994 * fixed, this kludge is necessary.
996 if (arg->ncommands == 0)
997 pipeline_command (arg, pipecmd_new_passthrough ());
1002 void pipeline_command (pipeline *p, pipecmd *cmd)
1004 if (p->ncommands >= p->commands_max) {
1005 p->commands_max *= 2;
1006 p->commands = xrealloc (p->commands,
1007 p->commands_max * sizeof *p->commands);
1010 p->commands[p->ncommands++] = cmd;
1013 void pipeline_command_argv (pipeline *p, const char *name, va_list argv)
1017 cmd = pipecmd_new_argv (name, argv);
1018 pipeline_command (p, cmd);
1021 void pipeline_command_args (pipeline *p, const char *name, ...)
1025 va_start (argv, name);
1026 pipeline_command_argv (p, name, argv);
1030 void pipeline_command_argstr (pipeline *p, const char *argstr)
1032 pipeline_command (p, pipecmd_new_argstr (argstr));
1035 void pipeline_commandv (pipeline *p, va_list cmdv)
1037 pipecmd *cmd = va_arg (cmdv, pipecmd *);
1040 pipeline_command (p, cmd);
1041 cmd = va_arg (cmdv, pipecmd *);
1045 void pipeline_commands (pipeline *p, ...)
1050 pipeline_commandv (p, cmdv);
1054 int pipeline_get_ncommands (pipeline *p)
1056 return p->ncommands;
1059 pipecmd *pipeline_get_command (pipeline *p, int n)
1061 if (n < 0 || n >= p->ncommands)
1063 return p->commands[n];
1066 pipecmd *pipeline_set_command (pipeline *p, int n, pipecmd *cmd)
1069 if (n < 0 || n >= p->ncommands)
1071 prev = p->commands[n];
1072 p->commands[n] = cmd;
1076 pid_t pipeline_get_pid (pipeline *p, int n)
1078 assert (p->pids); /* pipeline started */
1079 if (n < 0 || n >= p->ncommands)
1084 void pipeline_want_in (pipeline *p, int fd)
1086 p->redirect_in = REDIRECT_FD;
1088 p->want_infile = NULL;
1091 void pipeline_want_out (pipeline *p, int fd)
1093 p->redirect_out = REDIRECT_FD;
1095 p->want_outfile = NULL;
1098 void pipeline_want_infile (pipeline *p, const char *file)
1100 p->redirect_in = (file != NULL) ? REDIRECT_FILE_NAME : REDIRECT_NONE;
1102 p->want_infile = file;
1105 void pipeline_want_outfile (pipeline *p, const char *file)
1107 p->redirect_out = (file != NULL) ? REDIRECT_FILE_NAME : REDIRECT_NONE;
1109 p->want_outfile = file;
1112 void pipeline_ignore_signals (pipeline *p, int ignore_signals)
1114 p->ignore_signals = ignore_signals;
1117 FILE *pipeline_get_infile (pipeline *p)
1119 assert (p->pids); /* pipeline started */
1120 assert (p->statuses);
1123 else if (p->infd == -1) {
1124 error (0, 0, "pipeline input not open");
1127 return p->infile = fdopen (p->infd, "w");
1130 FILE *pipeline_get_outfile (pipeline *p)
1132 assert (p->pids); /* pipeline started */
1133 assert (p->statuses);
1136 else if (p->outfd == -1) {
1137 error (0, 0, "pipeline output not open");
1140 return p->outfile = fdopen (p->outfd, "r");
1143 void pipeline_dump (pipeline *p, FILE *stream)
1147 for (i = 0; i < p->ncommands; ++i) {
1148 pipecmd_dump (p->commands[i], stream);
1149 if (i < p->ncommands - 1)
1150 fputs (" | ", stream);
1152 fprintf (stream, " [input: {%d, %s}, output: {%d, %s}]\n",
1153 p->want_in, p->want_infile ? p->want_infile : "NULL",
1154 p->want_out, p->want_outfile ? p->want_outfile : "NULL");
1157 char *pipeline_tostring (pipeline *p)
1162 for (i = 0; i < p->ncommands; ++i) {
1163 char *cmdout = pipecmd_tostring (p->commands[i]);
1164 out = appendstr (out, cmdout, NULL);
1166 if (i < p->ncommands - 1)
1167 out = appendstr (out, " | ", NULL);
1173 void pipeline_free (pipeline *p)
1182 for (i = 0; i < p->ncommands; ++i)
1183 pipecmd_free (p->commands[i]);
1192 free (p->line_cache);
1196 /* ---------------------------------------------------------------------- */
1198 /* Functions to run pipelines and handle signals. */
1200 static pipeline **active_pipelines = NULL;
1201 static int n_active_pipelines = 0, max_active_pipelines = 0;
1203 static int sigchld = 0;
1204 static int queue_sigchld = 0;
1206 static int reap_children (int block)
1216 /* Deal with a SIGCHLD delivery. */
1217 pid = waitpid (-1, &status, WNOHANG);
1220 pid = waitpid (-1, &status, block ? 0 : WNOHANG);
1222 if (pid < 0 && errno == EINTR) {
1229 /* We've run out of children to reap. */
1234 /* Deliver the command status if possible. */
1235 for (i = 0; i < n_active_pipelines; ++i) {
1236 pipeline *p = active_pipelines[i];
1239 if (!p || !p->pids || !p->statuses)
1242 for (j = 0; j < p->ncommands; ++j) {
1243 if (p->pids[j] == pid) {
1244 p->statuses[j] = status;
1245 i = n_active_pipelines;
1250 } while ((sigchld || block == 0) && pid >= 0);
1258 static void pipeline_sigchld (int signum)
1260 /* really an assert, but that's not async-signal-safe */
1261 if (signum == SIGCHLD) {
1264 if (!queue_sigchld) {
1265 int save_errno = errno;
1272 static void pipeline_install_sigchld (void)
1274 struct sigaction act;
1275 static int installed = 0;
1280 memset (&act, 0, sizeof act);
1281 act.sa_handler = &pipeline_sigchld;
1282 sigemptyset (&act.sa_mask);
1283 sigaddset (&act.sa_mask, SIGINT);
1284 sigaddset (&act.sa_mask, SIGTERM);
1285 sigaddset (&act.sa_mask, SIGHUP);
1286 sigaddset (&act.sa_mask, SIGCHLD);
1289 act.sa_flags |= SA_NOCLDSTOP;
1292 act.sa_flags |= SA_RESTART;
1294 if (sigaction (SIGCHLD, &act, NULL) == -1)
1295 error (FATAL, errno, "can't install SIGCHLD handler");
1300 static pipeline_post_fork_fn *post_fork = NULL;
1302 void pipeline_install_post_fork (pipeline_post_fork_fn *fn)
1307 static int ignored_signals = 0;
1308 static struct sigaction osa_sigint, osa_sigquit;
1310 void pipeline_start (pipeline *p)
1313 int last_input = -1;
1317 /* Make sure our SIGCHLD handler is installed. */
1318 pipeline_install_sigchld ();
1320 assert (!p->pids); /* pipeline not started already */
1321 assert (!p->statuses);
1325 debug ("Starting pipeline: ");
1326 pipeline_dump (p, stderr);
1329 /* Flush all pending output so that subprocesses don't inherit it. */
1332 if (p->ignore_signals && !ignored_signals++) {
1333 struct sigaction sa;
1335 /* Ignore SIGINT and SIGQUIT while subprocesses are running,
1336 * just like system().
1338 memset (&sa, 0, sizeof sa);
1339 sa.sa_handler = SIG_IGN;
1340 sigemptyset (&sa.sa_mask);
1342 if (sigaction (SIGINT, &sa, &osa_sigint) < 0)
1343 error (FATAL, errno, "Couldn't ignore SIGINT");
1344 if (sigaction (SIGQUIT, &sa, &osa_sigquit) < 0)
1345 error (FATAL, errno, "Couldn't ignore SIGQUIT");
1348 /* Add to the table of active pipelines, so that signal handlers
1349 * know what to do with exit statuses. Block SIGCHLD so that we can
1353 sigaddset (&set, SIGCHLD);
1354 sigemptyset (&oset);
1355 while (sigprocmask (SIG_BLOCK, &set, &oset) == -1 && errno == EINTR)
1358 /* Grow the table if necessary. */
1359 if (n_active_pipelines >= max_active_pipelines) {
1360 int filled = max_active_pipelines;
1361 if (max_active_pipelines)
1362 max_active_pipelines *= 2;
1364 max_active_pipelines = 4;
1365 /* reduces to xmalloc (...) if active_pipelines == NULL */
1366 active_pipelines = xrealloc
1368 max_active_pipelines * sizeof *active_pipelines);
1369 memset (active_pipelines + filled, 0,
1370 (max_active_pipelines - filled) *
1371 sizeof *active_pipelines);
1374 for (i = 0; i < max_active_pipelines; ++i)
1375 if (!active_pipelines[i]) {
1376 active_pipelines[i] = p;
1379 assert (i < max_active_pipelines);
1380 ++n_active_pipelines;
1382 p->pids = xcalloc (p->ncommands, sizeof *p->pids);
1383 p->statuses = xcalloc (p->ncommands, sizeof *p->statuses);
1385 /* Unblock SIGCHLD. */
1386 while (sigprocmask (SIG_SETMASK, &oset, NULL) == -1 && errno == EINTR)
1389 if (p->redirect_in == REDIRECT_FD && p->want_in < 0) {
1390 if (pipe (infd) < 0)
1391 error (FATAL, errno, "pipe failed");
1392 last_input = infd[0];
1394 } else if (p->redirect_in == REDIRECT_FD)
1395 last_input = p->want_in;
1396 else if (p->redirect_in == REDIRECT_FILE_NAME) {
1397 assert (p->want_infile);
1398 last_input = open (p->want_infile, O_RDONLY);
1400 error (FATAL, errno, "can't open %s", p->want_infile);
1403 for (i = 0; i < p->ncommands; i++) {
1406 int output_read = -1, output_write = -1;
1408 if (i != p->ncommands - 1 ||
1409 (p->redirect_out == REDIRECT_FD && p->want_out < 0)) {
1410 if (pipe (pdes) < 0)
1411 error (FATAL, errno, "pipe failed");
1412 if (i == p->ncommands - 1)
1414 output_read = pdes[0];
1415 output_write = pdes[1];
1416 } else if (i == p->ncommands - 1) {
1417 if (p->redirect_out == REDIRECT_FD)
1418 output_write = p->want_out;
1419 else if (p->redirect_out == REDIRECT_FILE_NAME) {
1420 assert (p->want_outfile);
1421 output_write = open (p->want_outfile,
1422 O_WRONLY | O_CREAT |
1424 if (output_write < 0)
1425 error (FATAL, errno, "can't open %s",
1430 /* Block SIGCHLD so that the signal handler doesn't collect
1431 * the exit status before we've filled in the pids array.
1434 sigaddset (&set, SIGCHLD);
1435 sigemptyset (&oset);
1436 while (sigprocmask (SIG_BLOCK, &set, &oset) == -1 &&
1442 error (FATAL, errno, "fork failed");
1448 /* input, reading side */
1449 if (last_input != -1) {
1450 if (dup2 (last_input, 0) < 0)
1451 error (FATAL, errno, "dup2 failed");
1452 if (close (last_input) < 0)
1453 error (FATAL, errno, "close failed");
1456 /* output, writing side */
1457 if (output_write != -1) {
1458 if (dup2 (output_write, 1) < 0)
1459 error (FATAL, errno, "dup2 failed");
1460 if (close (output_write) < 0)
1461 error (FATAL, errno, "close failed");
1464 /* output, reading side */
1465 if (output_read != -1)
1466 if (close (output_read))
1467 error (FATAL, errno, "close failed");
1469 /* input from first command, writing side; must close
1470 * it in every child because it has to be created
1471 * before forking anything
1474 if (close (p->infd))
1475 error (FATAL, errno, "close failed");
1477 /* inputs and outputs from other active pipelines */
1478 for (j = 0; j < n_active_pipelines; ++j) {
1479 pipeline *active = active_pipelines[j];
1480 if (!active || active == p)
1482 /* ignore failures */
1483 if (active->infd != -1)
1484 close (active->infd);
1485 if (active->outfd != -1)
1486 close (active->outfd);
1489 /* Restore signals. */
1490 if (p->ignore_signals) {
1491 sigaction (SIGINT, &osa_sigint, NULL);
1492 sigaction (SIGQUIT, &osa_sigquit, NULL);
1495 pipecmd_exec (p->commands[i]);
1500 if (last_input != -1) {
1501 if (close (last_input) < 0)
1502 error (FATAL, errno, "close failed");
1504 if (output_write != -1) {
1505 if (close (output_write) < 0)
1506 error (FATAL, errno, "close failed");
1508 if (output_read != -1)
1509 last_input = output_read;
1511 p->statuses[i] = -1;
1513 /* Unblock SIGCHLD. */
1514 while (sigprocmask (SIG_SETMASK, &oset, NULL) == -1 &&
1518 debug ("Started \"%s\", pid %d\n", p->commands[i]->name, pid);
1521 if (p->ncommands == 0)
1522 p->outfd = last_input;
1525 int pipeline_wait_all (pipeline *p, int **statuses, int *n_statuses)
1528 int proc_count = p->ncommands;
1530 int raise_signal = 0;
1534 debug ("Waiting for pipeline: ");
1535 pipeline_dump (p, stderr);
1538 assert (p->pids); /* pipeline started */
1539 assert (p->statuses);
1542 if (fclose (p->infile))
1544 "closing pipeline input stream failed");
1547 } else if (p->infd != -1) {
1548 if (close (p->infd))
1549 error (0, errno, "closing pipeline input failed");
1554 if (fclose (p->outfile)) {
1556 "closing pipeline output stream failed");
1561 } else if (p->outfd != -1) {
1562 if (close (p->outfd)) {
1563 error (0, errno, "closing pipeline output failed");
1569 /* Tell the SIGCHLD handler not to get in our way. */
1572 while (proc_count > 0) {
1575 debug ("Active processes (%d):\n", proc_count);
1577 /* Check for any statuses already collected by SIGCHLD
1578 * handlers or the previous iteration before calling
1579 * reap_children() again.
1581 for (i = 0; i < p->ncommands; ++i) {
1584 if (p->pids[i] == -1)
1587 debug (" \"%s\" (%d) -> %d\n",
1588 p->commands[i]->name, p->pids[i],
1591 if (p->statuses[i] == -1)
1594 status = p->statuses[i];
1597 if (WIFSIGNALED (status)) {
1598 int sig = WTERMSIG (status);
1603 #endif /* SIGPIPE */
1604 /* signals currently blocked,
1607 if (sig == SIGINT || sig == SIGQUIT)
1609 else if (WCOREDUMP (status))
1611 "%s: %s (core dumped)",
1612 p->commands[i]->name,
1615 error (0, 0, "%s: %s",
1616 p->commands[i]->name,
1620 #endif /* SIGPIPE */
1621 } else if (!WIFEXITED (status))
1622 error (0, 0, "unexpected status %d",
1625 if (p->commands[i]->tag == PIPECMD_FUNCTION) {
1626 struct pipecmd_function *cmdf =
1627 &p->commands[i]->u.function;
1628 if (cmdf->free_func)
1629 (*cmdf->free_func) (cmdf->data);
1632 if (i == p->ncommands - 1) {
1633 if (WIFSIGNALED (status))
1634 ret = 128 + WTERMSIG (status);
1635 else if (WEXITSTATUS (status))
1636 ret = WEXITSTATUS (status);
1638 (WIFSIGNALED (status) ||
1639 WEXITSTATUS (status)))
1643 assert (proc_count >= 0);
1644 if (proc_count == 0)
1648 r = reap_children (1);
1650 if (r == -1 && errno == ECHILD)
1651 /* Eh? The pipeline was allegedly still running, so
1652 * we shouldn't have got ECHILD.
1654 error (FATAL, errno, "waitpid failed");
1659 for (i = 0; i < n_active_pipelines; ++i)
1660 if (active_pipelines[i] == p)
1661 active_pipelines[i] = NULL;
1663 if (statuses && n_statuses) {
1664 *statuses = xnmalloc (p->ncommands, sizeof **statuses);
1665 *n_statuses = p->ncommands;
1666 for (i = 0; i < p->ncommands; ++i)
1667 (*statuses)[i] = p->statuses[i];
1675 if (p->ignore_signals && !--ignored_signals) {
1676 /* Restore signals. */
1677 sigaction (SIGINT, &osa_sigint, NULL);
1678 sigaction (SIGQUIT, &osa_sigquit, NULL);
1682 raise (raise_signal);
1687 int pipeline_wait (pipeline *p)
1689 return pipeline_wait_all (p, NULL, NULL);
1692 int pipeline_run (pipeline *p)
1697 status = pipeline_wait (p);
1703 void pipeline_pump (pipeline *p, ...)
1707 pipeline *arg, **pieces;
1709 int *known_source, *blocking_in, *blocking_out,
1710 *dying_source, *waiting, *write_error;
1711 struct sigaction sa, osa_sigpipe;
1713 /* Count pipelines and allocate space for arrays. */
1716 for (arg = p; arg; arg = va_arg (argv, pipeline *))
1719 pieces = xnmalloc (argc, sizeof *pieces);
1720 pos = xnmalloc (argc, sizeof *pos);
1721 known_source = xcalloc (argc, sizeof *known_source);
1722 blocking_in = xcalloc (argc, sizeof *blocking_in);
1723 blocking_out = xcalloc (argc, sizeof *blocking_out);
1724 dying_source = xcalloc (argc, sizeof *dying_source);
1725 waiting = xcalloc (argc, sizeof *waiting);
1726 write_error = xcalloc (argc, sizeof *write_error);
1728 /* Set up arrays of pipelines and their read positions. Start all
1729 * pipelines if necessary.
1732 for (arg = p, i = 0; i < argc; arg = va_arg (argv, pipeline *), ++i) {
1735 if (!pieces[i]->pids)
1736 pipeline_start (pieces[i]);
1738 assert (arg == NULL);
1741 /* All source pipelines must be supplied as arguments. */
1742 for (i = 0; i < argc; ++i) {
1744 if (!pieces[i]->source)
1746 for (j = 0; j < argc; ++j) {
1747 if (pieces[i]->source == pieces[j]) {
1748 known_source[j] = found = 1;
1755 for (i = 0; i < argc; ++i) {
1757 if (pieces[i]->infd != -1) {
1758 flags = fcntl (pieces[i]->infd, F_GETFL);
1759 if (!(flags & O_NONBLOCK)) {
1761 fcntl (pieces[i]->infd, F_SETFL,
1762 flags | O_NONBLOCK);
1765 if (pieces[i]->outfd != -1) {
1766 flags = fcntl (pieces[i]->outfd, F_GETFL);
1767 if (!(flags & O_NONBLOCK)) {
1768 blocking_out[i] = 1;
1769 fcntl (pieces[i]->outfd, F_SETFL,
1770 flags | O_NONBLOCK);
1776 memset (&sa, 0, sizeof sa);
1777 sa.sa_handler = SIG_IGN;
1778 sigemptyset (&sa.sa_mask);
1780 sigaction (SIGPIPE, &sa, &osa_sigpipe);
1784 /* We rely on getting EINTR from select. */
1785 sigaction (SIGCHLD, NULL, &sa);
1786 sa.sa_flags &= ~SA_RESTART;
1787 sigaction (SIGCHLD, &sa, NULL);
1795 /* If a source dies and all data from it has been written to
1796 * all sinks, close the writing end of the pipe to each of
1799 for (i = 0; i < argc; ++i) {
1800 if (!known_source[i] || pieces[i]->outfd != -1 ||
1801 pipeline_peek_size (pieces[i]))
1803 for (j = 0; j < argc; ++j) {
1804 if (pieces[j]->source == pieces[i] &&
1805 pieces[j]->infd != -1) {
1806 if (close (pieces[j]->infd))
1810 pieces[j]->infd = -1;
1815 /* If all sinks on a source have died, close the reading end
1816 * of the pipe from that source.
1818 for (i = 0; i < argc; ++i) {
1820 if (!known_source[i] || pieces[i]->outfd == -1)
1822 for (j = 0; j < argc; ++j) {
1823 if (pieces[j]->source == pieces[i] &&
1824 pieces[j]->infd != -1) {
1831 if (close (pieces[i]->outfd))
1833 "closing pipeline output failed");
1834 pieces[i]->outfd = -1;
1839 for (i = 0; i < argc; ++i) {
1840 /* Input to sink pipeline. */
1841 if (pieces[i]->source && pieces[i]->infd != -1 &&
1843 FD_SET (pieces[i]->infd, &wfds);
1844 if (pieces[i]->infd > maxfd)
1845 maxfd = pieces[i]->infd;
1847 /* Output from source pipeline. */
1848 if (known_source[i] && pieces[i]->outfd != -1) {
1849 FD_SET (pieces[i]->outfd, &rfds);
1850 if (pieces[i]->outfd > maxfd)
1851 maxfd = pieces[i]->outfd;
1855 break; /* nothing meaningful left to do */
1857 ret = select (maxfd + 1, &rfds, &wfds, NULL, NULL);
1858 if (ret < 0 && errno == EINTR) {
1859 /* Did a source or sink pipeline die? */
1860 for (i = 0; i < argc; ++i) {
1861 if (pieces[i]->ncommands == 0)
1863 if (known_source[i] && !dying_source[i] &&
1864 pieces[i]->outfd != -1) {
1865 int last = pieces[i]->ncommands - 1;
1866 assert (pieces[i]->statuses);
1867 if (pieces[i]->statuses[last] != -1) {
1868 debug ("source pipeline %d "
1870 dying_source[i] = 1;
1873 if (pieces[i]->source &&
1874 pieces[i]->infd != -1) {
1875 assert (pieces[i]->statuses);
1876 if (pieces[i]->statuses[0] != -1) {
1877 debug ("sink pipeline %d "
1879 close (pieces[i]->infd);
1880 pieces[i]->infd = -1;
1886 error (FATAL, errno, "select");
1888 /* Read a block of data from each available source pipeline. */
1889 for (i = 0; i < argc; ++i) {
1890 size_t peek_size, len;
1892 if (!known_source[i] || pieces[i]->outfd == -1)
1894 if (!FD_ISSET (pieces[i]->outfd, &rfds))
1897 peek_size = pipeline_peek_size (pieces[i]);
1898 len = peek_size + 4096;
1899 if (!pipeline_peek (pieces[i], &len) ||
1901 /* Error or end-of-file; skip this pipeline
1904 debug ("source pipeline %d returned error "
1906 close (pieces[i]->outfd);
1907 pieces[i]->outfd = -1;
1909 /* This is rather a large hammer. Whenever
1910 * any data is read from any source
1911 * pipeline, we go through and retry all
1912 * sink pipelines, even if they aren't
1913 * receiving data from the source in
1914 * question. This probably results in a few
1915 * more passes around the select() loop, but
1916 * it eliminates some annoyingly fiddly
1919 memset (waiting, 0, argc * sizeof *waiting);
1922 /* Write as much data as we can to each available sink
1925 for (i = 0; i < argc; ++i) {
1931 if (!pieces[i]->source || pieces[i]->infd == -1)
1933 if (!FD_ISSET (pieces[i]->infd, &wfds))
1935 peek_size = pipeline_peek_size (pieces[i]->source);
1936 if (peek_size <= pos[i]) {
1937 /* Disable reading until data is read from a
1938 * source fd or a child process exits, so
1939 * that we neither spin nor block if the
1946 /* peek a block from the source */
1947 block = pipeline_peek (pieces[i]->source, &peek_size);
1948 /* should all already be in the peek cache */
1952 /* write as much of it as will fit to the sink */
1954 w = safe_write (pieces[i]->infd,
1956 peek_size - pos[i]);
1959 if (errno == EAGAIN) {
1963 /* It may be useful for other processes to
1964 * continue even though this one fails, so
1968 write_error[i] = errno;
1969 close (pieces[i]->infd);
1970 pieces[i]->infd = -1;
1976 /* check other sinks on the same source, and update
1977 * the source's read position if earlier data is no
1978 * longer needed by any sink
1980 for (j = 0; j < argc; ++j) {
1981 if (pieces[i]->source != pieces[j]->source ||
1982 pieces[j]->infd == -1)
1984 if (pos[j] < minpos)
1986 /* If the source is dead and all data has
1987 * been written to this sink, close the
1988 * writing end of the pipe to the sink.
1990 if (pieces[j]->source->outfd == -1 &&
1991 pos[j] >= peek_size) {
1992 close (pieces[j]->infd);
1993 pieces[j]->infd = -1;
1997 /* If some data has been written to all sinks,
1998 * discard it from the source's peek cache.
2000 pipeline_peek_skip (pieces[i]->source, minpos);
2001 for (j = 0; j < argc; ++j) {
2002 if (pieces[i]->source == pieces[j]->source)
2010 sigaction (SIGCHLD, NULL, &sa);
2011 sa.sa_flags |= SA_RESTART;
2012 sigaction (SIGCHLD, &sa, NULL);
2016 sigaction (SIGPIPE, &osa_sigpipe, NULL);
2019 for (i = 0; i < argc; ++i) {
2021 if (blocking_in[i] && pieces[i]->infd != -1) {
2022 flags = fcntl (pieces[i]->infd, F_GETFL);
2023 fcntl (pieces[i]->infd, F_SETFL, flags & ~O_NONBLOCK);
2025 if (blocking_out[i] && pieces[i]->outfd != -1) {
2026 flags = fcntl (pieces[i]->outfd, F_GETFL);
2027 fcntl (pieces[i]->outfd, F_SETFL, flags & ~O_NONBLOCK);
2031 for (i = 0; i < argc; ++i) {
2033 error (FATAL, write_error[i], "write to sink %d", i);
2038 free (dying_source);
2039 free (blocking_out);
2041 free (known_source);
2046 /* ---------------------------------------------------------------------- */
2048 /* Functions to read output from pipelines. */
2050 static const char *get_block (pipeline *p, size_t *len, int peek)
2052 size_t readstart = 0, retstart = 0;
2053 size_t space = p->bufmax;
2054 size_t toread = *len;
2057 if (p->buffer && p->peek_offset) {
2058 if (p->peek_offset >= toread) {
2059 /* We've got the whole thing in the peek cache; just
2063 assert (p->peek_offset <= p->buflen);
2064 buffer = p->buffer + p->buflen - p->peek_offset;
2066 p->peek_offset -= toread;
2069 readstart = p->buflen;
2070 retstart = p->buflen - p->peek_offset;
2072 toread -= p->peek_offset;
2076 if (toread > space) {
2078 p->bufmax = readstart + toread;
2081 p->buffer = xrealloc (p->buffer, p->bufmax + 1);
2087 assert (p->outfd != -1);
2088 r = safe_read (p->outfd, p->buffer + readstart, toread);
2091 p->buflen = readstart + r;
2093 p->peek_offset += r;
2094 *len -= (toread - r);
2096 return p->buffer + retstart;
2099 const char *pipeline_read (pipeline *p, size_t *len)
2101 return get_block (p, len, 0);
2104 const char *pipeline_peek (pipeline *p, size_t *len)
2106 return get_block (p, len, 1);
2109 size_t pipeline_peek_size (pipeline *p)
2113 return p->peek_offset;
2116 void pipeline_peek_skip (pipeline *p, size_t len)
2120 assert (len <= p->peek_offset);
2121 p->peek_offset -= len;
2125 /* readline and peekline repeatedly peek larger and larger buffers until
2126 * they find a newline or they fail. readline then adjusts the peek offset.
2129 static const char *get_line (pipeline *p, size_t *outlen)
2131 const size_t block = 4096;
2132 const char *buffer = NULL, *end = NULL;
2135 if (p->line_cache) {
2136 free (p->line_cache);
2137 p->line_cache = NULL;
2143 for (i = 0; ; ++i) {
2144 size_t plen = block * (i + 1);
2146 buffer = get_block (p, &plen, 1);
2147 if (!buffer || plen == 0)
2150 end = memchr (buffer + block * i, '\n', plen);
2151 if (!end && plen < block * (i + 1))
2152 /* end of file, no newline found */
2153 end = buffer + plen - 1;
2159 p->line_cache = xstrndup (buffer, end - buffer + 1);
2161 *outlen = end - buffer + 1;
2162 return p->line_cache;
2167 const char *pipeline_readline (pipeline *p)
2170 const char *buffer = get_line (p, &buflen);
2172 p->peek_offset -= buflen;
2176 const char *pipeline_peekline (pipeline *p)
2178 return get_line (p, NULL);