resetting manifest requested domain to floor
[platform/upstream/libpipeline.git] / lib / pipeline.c
1 /* Copyright (C) 1989, 1990, 1991, 1992, 2000, 2001, 2002, 2003
2  * Free Software Foundation, Inc.
3  * Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008 Colin Watson.
4  *   Written for groff by James Clark (jjc@jclark.com)
5  *   Heavily adapted and extended for man-db by Colin Watson.
6  *
7  * This file is part of libpipeline.
8  *
9  * libpipeline is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or (at
12  * your option) any later version.
13  *
14  * libpipeline is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with libpipeline; if not, write to the Free Software
21  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
22  * USA.
23  */
24
25 #ifdef HAVE_CONFIG_H
26 #  include "config.h"
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include <sys/select.h>
34 #include <sys/time.h>
35 #include <sys/types.h>
36 #include <fcntl.h>
37 #include <unistd.h>
38 #include <stdarg.h>
39 #include <assert.h>
40 #include <string.h>
41 #include <sys/wait.h>
42
43 #include "dirname.h"
44 #include "full-write.h"
45 #include "safe-read.h"
46 #include "safe-write.h"
47 #include "xalloc.h"
48 #include "xstrndup.h"
49 #include "xvasprintf.h"
50
51 #include "pipeline-private.h"
52 #include "error.h"
53 #include "pipeline.h"
54
55 #ifdef USE_SOCKETPAIR_PIPE
56 #   include <netdb.h>
57 #   include <netinet/in.h>
58 #   include <sys/socket.h>
59 #   ifdef CORRECT_SOCKETPAIR_MODE
60 #       include <sys/stat.h>
61 #   endif
62 #   ifndef SHUT_RD
63 #       define SHUT_RD          0
64 #   endif
65 #   ifndef SHUT_WR
66 #       define SHUT_WR          1
67 #   endif
68 #   ifdef CORRECT_SOCKETPAIR_MODE
69 #       define pipe(p) (((socketpair(AF_UNIX,SOCK_STREAM,0,p) < 0) || \
70                    (shutdown((p)[1],SHUT_RD) < 0) || (fchmod((p)[1],S_IWUSR) < 0) || \
71                    (shutdown((p)[0],SHUT_WR) < 0) || (fchmod((p)[0],S_IRUSR) < 0)) ? -1 : 0)
72 #   else
73 #       define pipe(p) (((socketpair(AF_UNIX,SOCK_STREAM,0,p) < 0) || \
74                    (shutdown((p)[1],SHUT_RD) < 0) || (shutdown((p)[0],SHUT_WR) < 0)) ? -1 : 0)
75 #   endif
76 #endif
77
78 /* ---------------------------------------------------------------------- */
79
80 /* Functions to build individual commands. */
81
82 pipecmd *pipecmd_new (const char *name)
83 {
84         pipecmd *cmd = XMALLOC (pipecmd);
85         struct pipecmd_process *cmdp;
86         char *name_base;
87
88         cmd->tag = PIPECMD_PROCESS;
89         cmd->name = xstrdup (name);
90         cmd->nice = 0;
91         cmd->discard_err = 0;
92
93         cmd->nenv = 0;
94         cmd->env_max = 4;
95         cmd->env = xnmalloc (cmd->env_max, sizeof *cmd->env);
96
97         cmdp = &cmd->u.process;
98
99         cmdp->argc = 0;
100         cmdp->argv_max = 4;
101         cmdp->argv = xnmalloc (cmdp->argv_max, sizeof *cmdp->argv);
102
103         /* argv[0] is the basename of the command name. */
104         name_base = base_name (name);
105         pipecmd_arg (cmd, name_base);
106         free (name_base);
107
108         return cmd;
109 }
110
111 pipecmd *pipecmd_new_argv (const char *name, va_list argv)
112 {
113         pipecmd *cmd = pipecmd_new (name);
114         pipecmd_argv (cmd, argv);
115         return cmd;
116 }
117
118 pipecmd *pipecmd_new_args (const char *name, ...)
119 {
120         va_list argv;
121         pipecmd *cmd;
122
123         va_start (argv, name);
124         cmd = pipecmd_new_argv (name, argv);
125         va_end (argv);
126
127         return cmd;
128 }
129
130 /* As suggested in the header file, this function (for pipecmd_new_argstr()
131  * and pipecmd_argstr()) is really a wart. If we didn't have to worry about
132  * old configuration files then it wouldn't be necessary. Worse, the
133  * definition for tr in man_db.conf currently contains single-quoting, and
134  * people probably took that as a licence to do similar things, so we're
135  * obliged to worry about quoting as well!
136  *
137  * However, we can mitigate this; shell quoting alone is safe though
138  * sometimes confusing, but it's other shell constructs that tend to cause
139  * real security holes. Therefore, rather than punting to 'sh -c' or
140  * whatever, we parse a safe subset manually. Environment variables are not
141  * currently handled because of tricky word splitting issues, but in
142  * principle they could be if there's demand for it.
143  *
144  * TODO: Support setting environment variables.
145  */
146 static char *argstr_get_word (const char **argstr)
147 {
148         char *out = NULL;
149         const char *litstart = *argstr;
150         enum { NONE, SINGLE, DOUBLE } quotemode = NONE;
151
152         while (**argstr) {
153                 char backslashed[2];
154
155                 /* If it's just a literal character, go round again. */
156                 if ((quotemode == NONE && !strchr (" \t'\"\\", **argstr)) ||
157                     /* nothing is special in '; terminated by ' */
158                     (quotemode == SINGLE && **argstr != '\'') ||
159                     /* \ is special in "; terminated by " */
160                     (quotemode == DOUBLE && !strchr ("\"\\", **argstr))) {
161                         ++*argstr;
162                         continue;
163                 }
164
165                 /* Within "", \ is only special when followed by $, `, ", or
166                  * \ (or <newline> in a real shell, but we don't do that).
167                  */
168                 if (quotemode == DOUBLE && **argstr == '\\' &&
169                     !strchr ("$`\"\\", *(*argstr + 1))) {
170                         ++*argstr;
171                         continue;
172                 }
173
174                 /* Copy any accumulated literal characters. */
175                 if (litstart < *argstr) {
176                         char *tmp = xstrndup (litstart, *argstr - litstart);
177                         out = appendstr (out, tmp, NULL);
178                         free (tmp);
179                 }
180
181                 switch (**argstr) {
182                         case ' ':
183                         case '\t':
184                                 /* End of word; skip over extra whitespace. */
185                                 while (*++*argstr)
186                                         if (!strchr (" \t", **argstr))
187                                                 break;
188                                 return out;
189
190                         case '\'':
191                                 if (quotemode != NONE)
192                                         quotemode = NONE;
193                                 else
194                                         quotemode = SINGLE;
195                                 litstart = ++*argstr;
196                                 break;
197
198                         case '"':
199                                 if (quotemode != NONE)
200                                         quotemode = NONE;
201                                 else
202                                         quotemode = DOUBLE;
203                                 litstart = ++*argstr;
204                                 break;
205
206                         case '\\':
207                                 backslashed[0] = *++*argstr;
208                                 if (!backslashed[0]) {
209                                         /* Unterminated quoting; give up. */
210                                         if (out)
211                                                 free (out);
212                                         return NULL;
213                                 }
214                                 backslashed[1] = '\0';
215                                 out = appendstr (out, backslashed, NULL);
216                                 litstart = ++*argstr;
217                                 break;
218
219                         default:
220                                 assert (!"unexpected state parsing argstr");
221                 }
222         }
223
224         if (quotemode != NONE) {
225                 /* Unterminated quoting; give up. */
226                 if (out)
227                         free (out);
228                 return NULL;
229         }
230
231         /* Copy any accumulated literal characters. */
232         if (litstart < *argstr) {
233                 char *tmp = xstrndup (litstart, *argstr - litstart);
234                 out = appendstr (out, tmp, NULL);
235                 free (tmp);
236         }
237
238         return out;
239 }
240
241 pipecmd *pipecmd_new_argstr (const char *argstr)
242 {
243         pipecmd *cmd;
244         char *arg;
245
246         arg = argstr_get_word (&argstr);
247         if (!arg)
248                 error (FATAL, 0,
249                        "badly formed configuration directive: '%s'",
250                        argstr);
251         if (!strcmp (arg, "exec")) {
252                 /* Some old configuration files have "exec command" rather
253                  * than "command"; this worked fine when being evaluated by
254                  * a shell, but since exec is a shell builtin it doesn't
255                  * work when being executed directly. To work around this,
256                  * we just drop "exec" if it appears at the start of argstr.
257                  */
258                 arg = argstr_get_word (&argstr);
259                 if (!arg)
260                         error (FATAL, 0,
261                                "badly formed configuration directive: '%s'",
262                                argstr);
263         }
264         cmd = pipecmd_new (arg);
265         free (arg);
266
267         while ((arg = argstr_get_word (&argstr))) {
268                 pipecmd_arg (cmd, arg);
269                 free (arg);
270         }
271
272         return cmd;
273 }
274
275 pipecmd *pipecmd_new_function (const char *name,
276                                pipecmd_function_type *func,
277                                pipecmd_function_free_type *free_func,
278                                void *data)
279 {
280         pipecmd *cmd = XMALLOC (pipecmd);
281         struct pipecmd_function *cmdf;
282
283         cmd->tag = PIPECMD_FUNCTION;
284         cmd->name = xstrdup (name);
285         cmd->nice = 0;
286         cmd->discard_err = 0;
287
288         cmd->nenv = 0;
289         cmd->env_max = 4;
290         cmd->env = xnmalloc (cmd->env_max, sizeof *cmd->env);
291
292         cmdf = &cmd->u.function;
293
294         cmdf->func = func;
295         cmdf->free_func = free_func;
296         cmdf->data = data;
297
298         return cmd;
299 }
300
301 pipecmd *pipecmd_new_sequencev (const char *name, va_list cmdv)
302 {
303         pipecmd *cmd = XMALLOC (pipecmd);
304         struct pipecmd_sequence *cmds;
305         pipecmd *child;
306
307         cmd->tag = PIPECMD_SEQUENCE;
308         cmd->name = xstrdup (name);
309         cmd->nice = 0;
310         cmd->discard_err = 0;
311
312         cmd->nenv = 0;
313         cmd->env_max = 4;
314         cmd->env = xnmalloc (cmd->env_max, sizeof *cmd->env);
315
316         cmds = &cmd->u.sequence;
317
318         cmds->ncommands = 0;
319         cmds->commands_max = 4;
320         cmds->commands = xnmalloc (cmds->commands_max, sizeof *cmds->commands);
321
322         child = va_arg (cmdv, pipecmd *);
323         while (child) {
324                 pipecmd_sequence_command (cmd, child);
325                 child = va_arg (cmdv, pipecmd *);
326         }
327
328         return cmd;
329 }
330
331 pipecmd *pipecmd_new_sequence (const char *name, ...)
332 {
333         va_list cmdv;
334         pipecmd *cmd;
335
336         va_start (cmdv, name);
337         cmd = pipecmd_new_sequencev (name, cmdv);
338         va_end (cmdv);
339
340         return cmd;
341 }
342
343 static void passthrough (void *data PIPELINE_ATTR_UNUSED)
344 {
345         for (;;) {
346                 char buffer[4096];
347                 int r = safe_read (STDIN_FILENO, buffer, 4096);
348                 if (r <= 0)
349                         break;
350                 if (full_write (STDOUT_FILENO, buffer,
351                                 (size_t) r) < (size_t) r)
352                         break;
353         }
354
355         return;
356 }
357
358 pipecmd *pipecmd_new_passthrough (void)
359 {
360         return pipecmd_new_function ("cat", &passthrough, NULL, NULL);
361 }
362
363 pipecmd *pipecmd_dup (pipecmd *cmd)
364 {
365         pipecmd *newcmd = XMALLOC (pipecmd);
366         int i;
367
368         newcmd->tag = cmd->tag;
369         newcmd->name = xstrdup (cmd->name);
370         newcmd->nice = cmd->nice;
371         newcmd->discard_err = cmd->discard_err;
372
373         newcmd->nenv = cmd->nenv;
374         newcmd->env_max = cmd->env_max;
375         assert (newcmd->nenv <= newcmd->env_max);
376         newcmd->env = xmalloc (newcmd->env_max * sizeof *newcmd->env);
377
378         for (i = 0; i < cmd->nenv; ++i) {
379                 newcmd->env[i].name =
380                         cmd->env[i].name ? xstrdup (cmd->env[i].name) : NULL;
381                 newcmd->env[i].value =
382                         cmd->env[i].value ? xstrdup (cmd->env[i].value) : NULL;
383         }
384
385         switch (newcmd->tag) {
386                 case PIPECMD_PROCESS: {
387                         struct pipecmd_process *cmdp = &cmd->u.process;
388                         struct pipecmd_process *newcmdp = &newcmd->u.process;
389
390                         newcmdp->argc = cmdp->argc;
391                         newcmdp->argv_max = cmdp->argv_max;
392                         assert (newcmdp->argc < newcmdp->argv_max);
393                         newcmdp->argv = xmalloc
394                                 (newcmdp->argv_max * sizeof *newcmdp->argv);
395
396                         for (i = 0; i < cmdp->argc; ++i)
397                                 newcmdp->argv[i] = xstrdup (cmdp->argv[i]);
398                         newcmdp->argv[cmdp->argc] = NULL;
399
400                         break;
401                 }
402
403                 case PIPECMD_FUNCTION: {
404                         struct pipecmd_function *cmdf = &cmd->u.function;
405                         struct pipecmd_function *newcmdf = &newcmd->u.function;
406
407                         newcmdf->func = cmdf->func;
408                         newcmdf->free_func = cmdf->free_func;
409                         newcmdf->data = cmdf->data;
410
411                         break;
412                 }
413
414                 case PIPECMD_SEQUENCE: {
415                         struct pipecmd_sequence *cmds = &cmd->u.sequence;
416                         struct pipecmd_sequence *newcmds = &newcmd->u.sequence;
417
418                         newcmds->ncommands = cmds->ncommands;
419                         newcmds->commands_max = cmds->commands_max;
420                         assert (newcmds->ncommands <= newcmds->commands_max);
421                         newcmds->commands = xmalloc
422                                 (newcmds->commands_max *
423                                  sizeof *newcmds->commands);
424
425                         for (i = 0; i < cmds->ncommands; ++i)
426                                 newcmds->commands[i] =
427                                         pipecmd_dup (cmds->commands[i]);
428
429                         break;
430                 }
431         }
432
433         return newcmd;
434 }
435
436 void pipecmd_arg (pipecmd *cmd, const char *arg)
437 {
438         struct pipecmd_process *cmdp;
439
440         assert (cmd->tag == PIPECMD_PROCESS);
441         cmdp = &cmd->u.process;
442
443         if (cmdp->argc + 1 >= cmdp->argv_max) {
444                 cmdp->argv_max *= 2;
445                 cmdp->argv = xrealloc (cmdp->argv,
446                                        cmdp->argv_max * sizeof *cmdp->argv);
447         }
448
449         cmdp->argv[cmdp->argc++] = xstrdup (arg);
450         assert (cmdp->argc < cmdp->argv_max);
451         cmdp->argv[cmdp->argc] = NULL;
452 }
453
454 void pipecmd_argf (pipecmd *cmd, const char *format, ...)
455 {
456         va_list argv;
457         char *arg;
458
459         va_start (argv, format);
460         arg = xvasprintf (format, argv);
461         pipecmd_arg (cmd, arg);
462         free (arg);
463         va_end (argv);
464 }
465
466 void pipecmd_argv (pipecmd *cmd, va_list argv)
467 {
468         const char *arg = va_arg (argv, const char *);
469
470         assert (cmd->tag == PIPECMD_PROCESS);
471
472         while (arg) {
473                 pipecmd_arg (cmd, arg);
474                 arg = va_arg (argv, const char *);
475         }
476 }
477
478 void pipecmd_args (pipecmd *cmd, ...)
479 {
480         va_list argv;
481
482         assert (cmd->tag == PIPECMD_PROCESS);
483
484         va_start (argv, cmd);
485         pipecmd_argv (cmd, argv);
486         va_end (argv);
487 }
488
489 void pipecmd_argstr (pipecmd *cmd, const char *argstr)
490 {
491         char *arg;
492
493         assert (cmd->tag == PIPECMD_PROCESS);
494
495         while ((arg = argstr_get_word (&argstr))) {
496                 pipecmd_arg (cmd, arg);
497                 free (arg);
498         }
499 }
500
501 int pipecmd_get_nargs (pipecmd *cmd)
502 {
503         struct pipecmd_process *cmdp;
504
505         assert (cmd->tag == PIPECMD_PROCESS);
506         cmdp = &cmd->u.process;
507
508         return cmdp->argc;
509 }
510
511 void pipecmd_nice (pipecmd *cmd, int value)
512 {
513         cmd->nice = value;
514 }
515
516 void pipecmd_discard_err (pipecmd *cmd, int discard_err)
517 {
518         cmd->discard_err = discard_err;
519 }
520
521 void pipecmd_setenv (pipecmd *cmd, const char *name, const char *value)
522 {
523         if (cmd->nenv >= cmd->env_max) {
524                 cmd->env_max *= 2;
525                 cmd->env = xrealloc (cmd->env,
526                                      cmd->env_max * sizeof *cmd->env);
527         }
528
529         cmd->env[cmd->nenv].name = xstrdup (name);
530         cmd->env[cmd->nenv].value = xstrdup (value);
531         ++cmd->nenv;
532 }
533
534 void pipecmd_unsetenv (pipecmd *cmd, const char *name)
535 {
536         if (cmd->nenv >= cmd->env_max) {
537                 cmd->env_max *= 2;
538                 cmd->env = xrealloc (cmd->env,
539                                      cmd->env_max * sizeof *cmd->env);
540         }
541
542         cmd->env[cmd->nenv].name = xstrdup (name);
543         cmd->env[cmd->nenv].value = NULL;
544         ++cmd->nenv;
545 }
546
547 void pipecmd_clearenv (pipecmd *cmd)
548 {
549         if (cmd->nenv >= cmd->env_max) {
550                 cmd->env_max *= 2;
551                 cmd->env = xrealloc (cmd->env,
552                                      cmd->env_max * sizeof *cmd->env);
553         }
554
555         cmd->env[cmd->nenv].name = NULL;
556         cmd->env[cmd->nenv].value = NULL;
557         ++cmd->nenv;
558 }
559
560 void pipecmd_sequence_command (pipecmd *cmd, pipecmd *child)
561 {
562         struct pipecmd_sequence *cmds;
563
564         assert (cmd->tag == PIPECMD_SEQUENCE);
565         cmds = &cmd->u.sequence;
566
567         if (cmds->ncommands >= cmds->commands_max) {
568                 cmds->commands_max *= 2;
569                 cmds->commands = xrealloc
570                         (cmds->commands,
571                          cmds->commands_max * sizeof *cmds->commands);
572         }
573
574         cmds->commands[cmds->ncommands++] = child;
575 }
576
577 void pipecmd_dump (pipecmd *cmd, FILE *stream)
578 {
579         int i;
580
581         for (i = 0; i < cmd->nenv; ++i) {
582                 if (cmd->env[i].name)
583                         fprintf (stream, "%s=%s ",
584                                  cmd->env[i].name,
585                                  cmd->env[i].value ? cmd->env[i].value
586                                                    : "<unset>");
587                 else
588                         fprintf (stream, "env -i ");
589         }
590
591         switch (cmd->tag) {
592                 case PIPECMD_PROCESS: {
593                         struct pipecmd_process *cmdp = &cmd->u.process;
594
595                         fputs (cmd->name, stream);
596                         for (i = 1; i < cmdp->argc; ++i) {
597                                 /* TODO: escape_shell()? */
598                                 putc (' ', stream);
599                                 fputs (cmdp->argv[i], stream);
600                         }
601
602                         break;
603                 }
604
605                 case PIPECMD_FUNCTION:
606                         fputs (cmd->name, stream);
607                         break;
608
609                 case PIPECMD_SEQUENCE: {
610                         struct pipecmd_sequence *cmds = &cmd->u.sequence;
611
612                         putc ('(', stream);
613                         for (i = 0; i < cmds->ncommands; ++i) {
614                                 pipecmd_dump (cmds->commands[i], stream);
615                                 if (i < cmds->ncommands - 1)
616                                         fputs (" && ", stream);
617                         }
618                         putc (')', stream);
619
620                         break;
621                 }
622         }
623 }
624
625 char *pipecmd_tostring (pipecmd *cmd)
626 {
627         char *out = NULL;
628         int i;
629
630         for (i = 0; i < cmd->nenv; ++i) {
631                 if (cmd->env[i].name)
632                         out = appendstr (out, cmd->env[i].name, "=",
633                                          cmd->env[i].value ? cmd->env[i].value
634                                                            : "<unset>",
635                                          " ", NULL);
636                 else
637                         out = appendstr (out, "env -i ", NULL);
638         }
639
640         switch (cmd->tag) {
641                 case PIPECMD_PROCESS: {
642                         struct pipecmd_process *cmdp = &cmd->u.process;
643
644                         out = appendstr (out, cmd->name, NULL);
645                         for (i = 1; i < cmdp->argc; ++i)
646                                 /* TODO: escape_shell()? */
647                                 out = appendstr (out, " ", cmdp->argv[i],
648                                                  NULL);
649
650                         break;
651                 }
652
653                 case PIPECMD_FUNCTION:
654                         out = appendstr (out, cmd->name, NULL);
655                         break;
656
657                 case PIPECMD_SEQUENCE: {
658                         struct pipecmd_sequence *cmds = &cmd->u.sequence;
659
660                         out = appendstr (out, "(", NULL);
661                         for (i = 0; i < cmds->ncommands; ++i) {
662                                 char *subout = pipecmd_tostring
663                                         (cmds->commands[i]);
664                                 out = appendstr (out, subout, NULL);
665                                 free (subout);
666                                 if (i < cmds->ncommands - 1)
667                                         out = appendstr (out, " && ", NULL);
668                         }
669                         out = appendstr (out, ")", NULL);
670
671                         break;
672                 }
673         }
674
675         return out;
676 }
677
678 /* Children exit with this status if execvp fails. */
679 #define EXEC_FAILED_EXIT_STATUS 0xff
680
681 /* When called internally during pipeline execution, this is called in the
682  * forked child process, with file descriptors already set up.
683  */
684 void pipecmd_exec (pipecmd *cmd)
685 {
686         int i;
687
688         if (cmd->nice)
689                 if (nice (cmd->nice) < 0)
690                         /* Don't worry too much. */
691                         debug ("nice failed: %s\n", strerror (errno));
692
693         if (cmd->discard_err) {
694                 int devnull = open ("/dev/null", O_WRONLY);
695                 if (devnull != -1) {
696                         dup2 (devnull, 2);
697                         close (devnull);
698                 }
699         }
700
701         for (i = 0; i < cmd->nenv; ++i) {
702                 if (cmd->env[i].name) {
703                         if (cmd->env[i].value)
704                                 setenv (cmd->env[i].name,
705                                         cmd->env[i].value, 1);
706                         else
707                                 unsetenv (cmd->env[i].name);
708                 } else
709                         clearenv ();
710         }
711
712         switch (cmd->tag) {
713                 case PIPECMD_PROCESS: {
714                         struct pipecmd_process *cmdp = &cmd->u.process;
715                         execvp (cmd->name, cmdp->argv);
716                         break;
717                 }
718
719                 /* TODO: ideally, could there be a facility
720                  * to execute non-blocking functions without
721                  * needing to fork?
722                  */
723                 case PIPECMD_FUNCTION: {
724                         struct pipecmd_function *cmdf = &cmd->u.function;
725                         (*cmdf->func) (cmdf->data);
726                         /* pacify valgrind et al */
727                         if (cmdf->free_func)
728                                 (*cmdf->free_func) (cmdf->data);
729                         exit (0);
730                 }
731
732                 case PIPECMD_SEQUENCE: {
733                         struct pipecmd_sequence *cmds = &cmd->u.sequence;
734                         struct sigaction sa;
735
736                         /* pipeline_start will have blocked SIGCHLD. We like
737                          * it that way. Lose the parent's signal handler,
738                          * though.
739                          */
740                         memset (&sa, 0, sizeof sa);
741                         sa.sa_handler = SIG_DFL;
742                         sigemptyset (&sa.sa_mask);
743                         sa.sa_flags = 0;
744                         if (sigaction (SIGCHLD, &sa, NULL) == -1)
745                                 error (FATAL, errno,
746                                        "can't install SIGCHLD handler");
747
748                         for (i = 0; i < cmds->ncommands; ++i) {
749                                 pipecmd *child = cmds->commands[i];
750                                 pid_t pid = fork ();
751                                 int status;
752
753                                 if (pid < 0)
754                                         error (FATAL, errno, "fork failed");
755                                 if (pid == 0)
756                                         pipecmd_exec (child);
757                                 debug ("Started \"%s\", pid %d\n",
758                                        child->name, pid);
759
760                                 while (waitpid (pid, &status, 0) < 0) {
761                                         if (errno == EINTR)
762                                                 continue;
763                                         error (FATAL, errno, "waitpid failed");
764                                 }
765
766                                 debug ("  \"%s\" (%d) -> %d\n",
767                                        child->name, pid, status);
768
769                                 if (WIFSIGNALED (status)) {
770                                         int sig = WTERMSIG (status);
771 #ifdef SIGPIPE
772                                         if (sig == SIGPIPE)
773                                                 status = 0;
774                                         else
775 #endif /* SIGPIPE */
776                                         if (WCOREDUMP (status))
777                                                 error (0, 0,
778                                                        "%s: %s (core dumped)",
779                                                        child->name,
780                                                        strsignal (sig));
781                                         else
782                                                 error (0, 0, "%s: %s",
783                                                        child->name,
784                                                        strsignal (sig));
785                                 } else if (!WIFEXITED (status))
786                                         error (0, 0, "unexpected status %d",
787                                                status);
788
789                                 if (child->tag == PIPECMD_FUNCTION) {
790                                         struct pipecmd_function *cmdf =
791                                                 &child->u.function;
792                                         if (cmdf->free_func)
793                                                 (*cmdf->free_func)
794                                                         (cmdf->data);
795                                 }
796
797                                 if (WIFSIGNALED (status)) {
798                                         raise (WTERMSIG (status));
799                                         exit (1); /* just to make sure */
800                                 } else if (status && WIFEXITED (status))
801                                         exit (WEXITSTATUS (status));
802                         }
803
804                         exit (0);
805                 }
806         }
807
808         error (EXEC_FAILED_EXIT_STATUS, errno, "can't execute %s", cmd->name);
809         /* Never called, but gcc doesn't realise that error with non-zero
810          * status always exits.
811          */
812         exit (EXEC_FAILED_EXIT_STATUS);
813 }
814
815 void pipecmd_free (pipecmd *cmd)
816 {
817         int i;
818
819         if (!cmd)
820                 return;
821
822         free (cmd->name);
823
824         for (i = 0; i < cmd->nenv; ++i) {
825                 free (cmd->env[i].name);
826                 free (cmd->env[i].value);
827         }
828         free (cmd->env);
829
830         switch (cmd->tag) {
831                 case PIPECMD_PROCESS: {
832                         struct pipecmd_process *cmdp = &cmd->u.process;
833
834                         for (i = 0; i < cmdp->argc; ++i)
835                                 free (cmdp->argv[i]);
836                         free (cmdp->argv);
837
838                         break;
839                 }
840
841                 case PIPECMD_FUNCTION:
842                         break;
843
844                 case PIPECMD_SEQUENCE: {
845                         struct pipecmd_sequence *cmds = &cmd->u.sequence;
846
847                         for (i = 0; i < cmds->ncommands; ++i)
848                                 pipecmd_free (cmds->commands[i]);
849                         free (cmds->commands);
850
851                         break;
852                 }
853         }
854
855         free (cmd);
856 }
857
858 /* ---------------------------------------------------------------------- */
859
860 /* Functions to build pipelines. */
861
862 pipeline *pipeline_new (void)
863 {
864         pipeline *p = XMALLOC (pipeline);
865         p->ncommands = 0;
866         p->commands_max = 4;
867         p->commands = xnmalloc (p->commands_max, sizeof *p->commands);
868         p->pids = NULL;
869         p->statuses = NULL;
870         p->redirect_in = p->redirect_out = REDIRECT_NONE;
871         p->want_in = p->want_out = 0;
872         p->want_infile = p->want_outfile = NULL;
873         p->infd = p->outfd = -1;
874         p->infile = p->outfile = NULL;
875         p->source = NULL;
876         p->buffer = NULL;
877         p->buflen = p->bufmax = 0;
878         p->line_cache = NULL;
879         p->peek_offset = 0;
880         p->ignore_signals = 0;
881         return p;
882 }
883
884 pipeline *pipeline_new_commandv (pipecmd *cmd1, va_list cmdv)
885 {
886         pipeline *p = pipeline_new ();
887         pipeline_command (p, cmd1);
888         pipeline_commandv (p, cmdv);
889         return p;
890 }
891
892 pipeline *pipeline_new_commands (pipecmd *cmd1, ...)
893 {
894         va_list cmdv;
895         pipeline *p;
896
897         va_start (cmdv, cmd1);
898         p = pipeline_new_commandv (cmd1, cmdv);
899         va_end (cmdv);
900
901         return p;
902 }
903
904 pipeline *pipeline_new_command_argv (const char *name, va_list argv)
905 {
906         pipeline *p;
907         pipecmd *cmd;
908
909         p = pipeline_new ();
910         cmd = pipecmd_new_argv (name, argv);
911         pipeline_command (p, cmd);
912
913         return p;
914 }
915
916 pipeline *pipeline_new_command_args (const char *name, ...)
917 {
918         va_list argv;
919         pipeline *p;
920
921         va_start (argv, name);
922         p = pipeline_new_command_argv (name, argv);
923         va_end (argv);
924
925         return p;
926 }
927
928 pipeline *pipeline_join (pipeline *p1, pipeline *p2)
929 {
930         pipeline *p = XMALLOC (pipeline);
931         int i;
932
933         assert (!p1->pids);
934         assert (!p2->pids);
935         assert (!p1->statuses);
936         assert (!p2->statuses);
937
938         p->ncommands = p1->ncommands + p2->ncommands;
939         p->commands_max = p1->ncommands + p2->ncommands;
940         p->commands = xnmalloc (p->commands_max, sizeof *p->commands);
941         p->pids = NULL;
942         p->statuses = NULL;
943         p->redirect_in = p1->redirect_in;
944         p->want_in = p1->want_in;
945         p->want_infile = p1->want_infile;
946         p->redirect_out = p2->redirect_out;
947         p->want_out = p2->want_out;
948         p->want_outfile = p2->want_outfile;
949         p->infd = p1->infd;
950         p->outfd = p2->outfd;
951         p->infile = p1->infile;
952         p->outfile = p2->outfile;
953         p->source = NULL;
954         p->buffer = NULL;
955         p->buflen = p->bufmax = 0;
956         p->line_cache = NULL;
957         p->peek_offset = 0;
958         p->ignore_signals = (p1->ignore_signals || p2->ignore_signals);
959
960         for (i = 0; i < p1->ncommands; ++i)
961                 p->commands[i] = pipecmd_dup (p1->commands[i]);
962         for (i = 0; i < p2->ncommands; ++i)
963                 p->commands[p1->ncommands + i] = pipecmd_dup (p2->commands[i]);
964
965         return p;
966 }
967
968 void pipeline_connect (pipeline *source, pipeline *sink, ...)
969 {
970         va_list argv;
971         pipeline *arg;
972
973         /* We must be in control of output from the source pipeline. If the
974          * source isn't started, we can force this.
975          */
976         if (!source->pids)
977                 pipeline_want_out (source, -1);
978         assert (source->redirect_out == REDIRECT_FD);
979         assert (source->want_out < 0);
980
981         va_start (argv, sink);
982         for (arg = sink; arg; arg = va_arg (argv, pipeline *)) {
983                 assert (!arg->pids); /* not started */
984                 arg->source = source;
985                 pipeline_want_in (arg, -1);
986
987                 /* Zero-command sinks should represent data being passed
988                  * straight through from the input to the output.
989                  * Unfortunately pipeline_start and pipeline_pump don't
990                  * handle this very well between them; a zero-command
991                  * pipeline has the write end of its input pipe wrongly
992                  * stashed in outfd and then pipeline_pump can't handle it
993                  * because it has nowhere to send output. Until this is
994                  * fixed, this kludge is necessary.
995                  */
996                 if (arg->ncommands == 0)
997                         pipeline_command (arg, pipecmd_new_passthrough ());
998         }
999         va_end (argv);
1000 }
1001
1002 void pipeline_command (pipeline *p, pipecmd *cmd)
1003 {
1004         if (p->ncommands >= p->commands_max) {
1005                 p->commands_max *= 2;
1006                 p->commands = xrealloc (p->commands,
1007                                         p->commands_max * sizeof *p->commands);
1008         }
1009
1010         p->commands[p->ncommands++] = cmd;
1011 }
1012
1013 void pipeline_command_argv (pipeline *p, const char *name, va_list argv)
1014 {
1015         pipecmd *cmd;
1016
1017         cmd = pipecmd_new_argv (name, argv);
1018         pipeline_command (p, cmd);
1019 }
1020
1021 void pipeline_command_args (pipeline *p, const char *name, ...)
1022 {
1023         va_list argv;
1024
1025         va_start (argv, name);
1026         pipeline_command_argv (p, name, argv);
1027         va_end (argv);
1028 }
1029
1030 void pipeline_command_argstr (pipeline *p, const char *argstr)
1031 {
1032         pipeline_command (p, pipecmd_new_argstr (argstr));
1033 }
1034
1035 void pipeline_commandv (pipeline *p, va_list cmdv)
1036 {
1037         pipecmd *cmd = va_arg (cmdv, pipecmd *);
1038
1039         while (cmd) {
1040                 pipeline_command (p, cmd);
1041                 cmd = va_arg (cmdv, pipecmd *);
1042         }
1043 }
1044
1045 void pipeline_commands (pipeline *p, ...)
1046 {
1047         va_list cmdv;
1048
1049         va_start (cmdv, p);
1050         pipeline_commandv (p, cmdv);
1051         va_end (cmdv);
1052 }
1053
1054 int pipeline_get_ncommands (pipeline *p)
1055 {
1056         return p->ncommands;
1057 }
1058
1059 pipecmd *pipeline_get_command (pipeline *p, int n)
1060 {
1061         if (n < 0 || n >= p->ncommands)
1062                 return NULL;
1063         return p->commands[n];
1064 }
1065
1066 pipecmd *pipeline_set_command (pipeline *p, int n, pipecmd *cmd)
1067 {
1068         pipecmd *prev;
1069         if (n < 0 || n >= p->ncommands)
1070                 return NULL;
1071         prev = p->commands[n];
1072         p->commands[n] = cmd;
1073         return prev;
1074 }
1075
1076 pid_t pipeline_get_pid (pipeline *p, int n)
1077 {
1078         assert (p->pids);       /* pipeline started */
1079         if (n < 0 || n >= p->ncommands)
1080                 return -1;
1081         return p->pids[n];
1082 }
1083
1084 void pipeline_want_in (pipeline *p, int fd)
1085 {
1086         p->redirect_in = REDIRECT_FD;
1087         p->want_in = fd;
1088         p->want_infile = NULL;
1089 }
1090
1091 void pipeline_want_out (pipeline *p, int fd)
1092 {
1093         p->redirect_out = REDIRECT_FD;
1094         p->want_out = fd;
1095         p->want_outfile = NULL;
1096 }
1097
1098 void pipeline_want_infile (pipeline *p, const char *file)
1099 {
1100         p->redirect_in = (file != NULL) ? REDIRECT_FILE_NAME : REDIRECT_NONE;
1101         p->want_in = 0;
1102         p->want_infile = file;
1103 }
1104
1105 void pipeline_want_outfile (pipeline *p, const char *file)
1106 {
1107         p->redirect_out = (file != NULL) ? REDIRECT_FILE_NAME : REDIRECT_NONE;
1108         p->want_out = 0;
1109         p->want_outfile = file;
1110 }
1111
1112 void pipeline_ignore_signals (pipeline *p, int ignore_signals)
1113 {
1114         p->ignore_signals = ignore_signals;
1115 }
1116
1117 FILE *pipeline_get_infile (pipeline *p)
1118 {
1119         assert (p->pids);       /* pipeline started */
1120         assert (p->statuses);
1121         if (p->infile)
1122                 return p->infile;
1123         else if (p->infd == -1) {
1124                 error (0, 0, "pipeline input not open");
1125                 return NULL;
1126         } else
1127                 return p->infile = fdopen (p->infd, "w");
1128 }
1129
1130 FILE *pipeline_get_outfile (pipeline *p)
1131 {
1132         assert (p->pids);       /* pipeline started */
1133         assert (p->statuses);
1134         if (p->outfile)
1135                 return p->outfile;
1136         else if (p->outfd == -1) {
1137                 error (0, 0, "pipeline output not open");
1138                 return NULL;
1139         } else
1140                 return p->outfile = fdopen (p->outfd, "r");
1141 }
1142
1143 void pipeline_dump (pipeline *p, FILE *stream)
1144 {
1145         int i;
1146
1147         for (i = 0; i < p->ncommands; ++i) {
1148                 pipecmd_dump (p->commands[i], stream);
1149                 if (i < p->ncommands - 1)
1150                         fputs (" | ", stream);
1151         }
1152         fprintf (stream, " [input: {%d, %s}, output: {%d, %s}]\n",
1153                  p->want_in, p->want_infile ? p->want_infile : "NULL",
1154                  p->want_out, p->want_outfile ? p->want_outfile : "NULL");
1155 }
1156
1157 char *pipeline_tostring (pipeline *p)
1158 {
1159         char *out = NULL;
1160         int i;
1161
1162         for (i = 0; i < p->ncommands; ++i) {
1163                 char *cmdout = pipecmd_tostring (p->commands[i]);
1164                 out = appendstr (out, cmdout, NULL);
1165                 free (cmdout);
1166                 if (i < p->ncommands - 1)
1167                         out = appendstr (out, " | ", NULL);
1168         }
1169
1170         return out;
1171 }
1172
1173 void pipeline_free (pipeline *p)
1174 {
1175         int i;
1176
1177         if (!p)
1178                 return;
1179         if (p->pids)
1180                 pipeline_wait (p);
1181
1182         for (i = 0; i < p->ncommands; ++i)
1183                 pipecmd_free (p->commands[i]);
1184         free (p->commands);
1185         if (p->pids)
1186                 free (p->pids);
1187         if (p->statuses)
1188                 free (p->statuses);
1189         if (p->buffer)
1190                 free (p->buffer);
1191         if (p->line_cache)
1192                 free (p->line_cache);
1193         free (p);
1194 }
1195
1196 /* ---------------------------------------------------------------------- */
1197
1198 /* Functions to run pipelines and handle signals. */
1199
1200 static pipeline **active_pipelines = NULL;
1201 static int n_active_pipelines = 0, max_active_pipelines = 0;
1202
1203 static int sigchld = 0;
1204 static int queue_sigchld = 0;
1205
1206 static int reap_children (int block)
1207 {
1208         pid_t pid;
1209         int status;
1210         int collected = 0;
1211
1212         do {
1213                 int i;
1214
1215                 if (sigchld) {
1216                         /* Deal with a SIGCHLD delivery. */
1217                         pid = waitpid (-1, &status, WNOHANG);
1218                         --sigchld;
1219                 } else
1220                         pid = waitpid (-1, &status, block ? 0 : WNOHANG);
1221
1222                 if (pid < 0 && errno == EINTR) {
1223                         /* Try again. */
1224                         pid = 0;
1225                         continue;
1226                 }
1227
1228                 if (pid <= 0)
1229                         /* We've run out of children to reap. */
1230                         break;
1231
1232                 ++collected;
1233
1234                 /* Deliver the command status if possible. */
1235                 for (i = 0; i < n_active_pipelines; ++i) {
1236                         pipeline *p = active_pipelines[i];
1237                         int j;
1238
1239                         if (!p || !p->pids || !p->statuses)
1240                                 continue;
1241
1242                         for (j = 0; j < p->ncommands; ++j) {
1243                                 if (p->pids[j] == pid) {
1244                                         p->statuses[j] = status;
1245                                         i = n_active_pipelines;
1246                                         break;
1247                                 }
1248                         }
1249                 }
1250         } while ((sigchld || block == 0) && pid >= 0);
1251
1252         if (collected)
1253                 return collected;
1254         else
1255                 return -1;
1256 }
1257
1258 static void pipeline_sigchld (int signum)
1259 {
1260         /* really an assert, but that's not async-signal-safe */
1261         if (signum == SIGCHLD) {
1262                 ++sigchld;
1263
1264                 if (!queue_sigchld) {
1265                         int save_errno = errno;
1266                         reap_children (0);
1267                         errno = save_errno;
1268                 }
1269         }
1270 }
1271
1272 static void pipeline_install_sigchld (void)
1273 {
1274         struct sigaction act;
1275         static int installed = 0;
1276
1277         if (installed)
1278                 return;
1279
1280         memset (&act, 0, sizeof act);
1281         act.sa_handler = &pipeline_sigchld;
1282         sigemptyset (&act.sa_mask);
1283         sigaddset (&act.sa_mask, SIGINT);
1284         sigaddset (&act.sa_mask, SIGTERM);
1285         sigaddset (&act.sa_mask, SIGHUP);
1286         sigaddset (&act.sa_mask, SIGCHLD);
1287         act.sa_flags = 0;
1288 #ifdef SA_NOCLDSTOP
1289         act.sa_flags |= SA_NOCLDSTOP;
1290 #endif
1291 #ifdef SA_RESTART
1292         act.sa_flags |= SA_RESTART;
1293 #endif
1294         if (sigaction (SIGCHLD, &act, NULL) == -1)
1295                 error (FATAL, errno, "can't install SIGCHLD handler");
1296
1297         installed = 1;
1298 }
1299
1300 static pipeline_post_fork_fn *post_fork = NULL;
1301
1302 void pipeline_install_post_fork (pipeline_post_fork_fn *fn)
1303 {
1304         post_fork = fn;
1305 }
1306
1307 static int ignored_signals = 0;
1308 static struct sigaction osa_sigint, osa_sigquit;
1309
1310 void pipeline_start (pipeline *p)
1311 {
1312         int i, j;
1313         int last_input = -1;
1314         int infd[2];
1315         sigset_t set, oset;
1316
1317         /* Make sure our SIGCHLD handler is installed. */
1318         pipeline_install_sigchld ();
1319
1320         assert (!p->pids);      /* pipeline not started already */
1321         assert (!p->statuses);
1322
1323         init_debug ();
1324         if (debug_level) {
1325                 debug ("Starting pipeline: ");
1326                 pipeline_dump (p, stderr);
1327         }
1328
1329         /* Flush all pending output so that subprocesses don't inherit it. */
1330         fflush (NULL);
1331
1332         if (p->ignore_signals && !ignored_signals++) {
1333                 struct sigaction sa;
1334
1335                 /* Ignore SIGINT and SIGQUIT while subprocesses are running,
1336                  * just like system().
1337                  */
1338                 memset (&sa, 0, sizeof sa);
1339                 sa.sa_handler = SIG_IGN;
1340                 sigemptyset (&sa.sa_mask);
1341                 sa.sa_flags = 0;
1342                 if (sigaction (SIGINT, &sa, &osa_sigint) < 0)
1343                         error (FATAL, errno, "Couldn't ignore SIGINT");
1344                 if (sigaction (SIGQUIT, &sa, &osa_sigquit) < 0)
1345                         error (FATAL, errno, "Couldn't ignore SIGQUIT");
1346         }
1347
1348         /* Add to the table of active pipelines, so that signal handlers
1349          * know what to do with exit statuses. Block SIGCHLD so that we can
1350          * do this safely.
1351          */
1352         sigemptyset (&set);
1353         sigaddset (&set, SIGCHLD);
1354         sigemptyset (&oset);
1355         while (sigprocmask (SIG_BLOCK, &set, &oset) == -1 && errno == EINTR)
1356                 ;
1357
1358         /* Grow the table if necessary. */
1359         if (n_active_pipelines >= max_active_pipelines) {
1360                 int filled = max_active_pipelines;
1361                 if (max_active_pipelines)
1362                         max_active_pipelines *= 2;
1363                 else
1364                         max_active_pipelines = 4;
1365                 /* reduces to xmalloc (...) if active_pipelines == NULL */
1366                 active_pipelines = xrealloc
1367                         (active_pipelines,
1368                          max_active_pipelines * sizeof *active_pipelines);
1369                 memset (active_pipelines + filled, 0,
1370                         (max_active_pipelines - filled) *
1371                                 sizeof *active_pipelines);
1372         }
1373
1374         for (i = 0; i < max_active_pipelines; ++i)
1375                 if (!active_pipelines[i]) {
1376                         active_pipelines[i] = p;
1377                         break;
1378                 }
1379         assert (i < max_active_pipelines);
1380         ++n_active_pipelines;
1381
1382         p->pids = xcalloc (p->ncommands, sizeof *p->pids);
1383         p->statuses = xcalloc (p->ncommands, sizeof *p->statuses);
1384
1385         /* Unblock SIGCHLD. */
1386         while (sigprocmask (SIG_SETMASK, &oset, NULL) == -1 && errno == EINTR)
1387                 ;
1388
1389         if (p->redirect_in == REDIRECT_FD && p->want_in < 0) {
1390                 if (pipe (infd) < 0)
1391                         error (FATAL, errno, "pipe failed");
1392                 last_input = infd[0];
1393                 p->infd = infd[1];
1394         } else if (p->redirect_in == REDIRECT_FD)
1395                 last_input = p->want_in;
1396         else if (p->redirect_in == REDIRECT_FILE_NAME) {
1397                 assert (p->want_infile);
1398                 last_input = open (p->want_infile, O_RDONLY);
1399                 if (last_input < 0)
1400                         error (FATAL, errno, "can't open %s", p->want_infile);
1401         }
1402
1403         for (i = 0; i < p->ncommands; i++) {
1404                 int pdes[2];
1405                 pid_t pid;
1406                 int output_read = -1, output_write = -1;
1407
1408                 if (i != p->ncommands - 1 ||
1409                     (p->redirect_out == REDIRECT_FD && p->want_out < 0)) {
1410                         if (pipe (pdes) < 0)
1411                                 error (FATAL, errno, "pipe failed");
1412                         if (i == p->ncommands - 1)
1413                                 p->outfd = pdes[0];
1414                         output_read = pdes[0];
1415                         output_write = pdes[1];
1416                 } else if (i == p->ncommands - 1) {
1417                         if (p->redirect_out == REDIRECT_FD)
1418                                 output_write = p->want_out;
1419                         else if (p->redirect_out == REDIRECT_FILE_NAME) {
1420                                 assert (p->want_outfile);
1421                                 output_write = open (p->want_outfile,
1422                                                      O_WRONLY | O_CREAT |
1423                                                      O_TRUNC, 0666);
1424                                 if (output_write < 0)
1425                                         error (FATAL, errno, "can't open %s",
1426                                                p->want_outfile);
1427                         }
1428                 }
1429
1430                 /* Block SIGCHLD so that the signal handler doesn't collect
1431                  * the exit status before we've filled in the pids array.
1432                  */
1433                 sigemptyset (&set);
1434                 sigaddset (&set, SIGCHLD);
1435                 sigemptyset (&oset);
1436                 while (sigprocmask (SIG_BLOCK, &set, &oset) == -1 &&
1437                        errno == EINTR)
1438                         ;
1439
1440                 pid = fork ();
1441                 if (pid < 0)
1442                         error (FATAL, errno, "fork failed");
1443                 if (pid == 0) {
1444                         /* child */
1445                         if (post_fork)
1446                                 post_fork ();
1447
1448                         /* input, reading side */
1449                         if (last_input != -1) {
1450                                 if (dup2 (last_input, 0) < 0)
1451                                         error (FATAL, errno, "dup2 failed");
1452                                 if (close (last_input) < 0)
1453                                         error (FATAL, errno, "close failed");
1454                         }
1455
1456                         /* output, writing side */
1457                         if (output_write != -1) {
1458                                 if (dup2 (output_write, 1) < 0)
1459                                         error (FATAL, errno, "dup2 failed");
1460                                 if (close (output_write) < 0)
1461                                         error (FATAL, errno, "close failed");
1462                         }
1463
1464                         /* output, reading side */
1465                         if (output_read != -1)
1466                                 if (close (output_read))
1467                                         error (FATAL, errno, "close failed");
1468
1469                         /* input from first command, writing side; must close
1470                          * it in every child because it has to be created
1471                          * before forking anything
1472                          */
1473                         if (p->infd != -1)
1474                                 if (close (p->infd))
1475                                         error (FATAL, errno, "close failed");
1476
1477                         /* inputs and outputs from other active pipelines */
1478                         for (j = 0; j < n_active_pipelines; ++j) {
1479                                 pipeline *active = active_pipelines[j];
1480                                 if (!active || active == p)
1481                                         continue;
1482                                 /* ignore failures */
1483                                 if (active->infd != -1)
1484                                         close (active->infd);
1485                                 if (active->outfd != -1)
1486                                         close (active->outfd);
1487                         }
1488
1489                         /* Restore signals. */
1490                         if (p->ignore_signals) {
1491                                 sigaction (SIGINT, &osa_sigint, NULL);
1492                                 sigaction (SIGQUIT, &osa_sigquit, NULL);
1493                         }
1494
1495                         pipecmd_exec (p->commands[i]);
1496                         /* never returns */
1497                 }
1498
1499                 /* in the parent */
1500                 if (last_input != -1) {
1501                         if (close (last_input) < 0)
1502                                 error (FATAL, errno, "close failed");
1503                 }
1504                 if (output_write != -1) {
1505                         if (close (output_write) < 0)
1506                                 error (FATAL, errno, "close failed");
1507                 }
1508                 if (output_read != -1)
1509                         last_input = output_read;
1510                 p->pids[i] = pid;
1511                 p->statuses[i] = -1;
1512
1513                 /* Unblock SIGCHLD. */
1514                 while (sigprocmask (SIG_SETMASK, &oset, NULL) == -1 &&
1515                        errno == EINTR)
1516                         ;
1517
1518                 debug ("Started \"%s\", pid %d\n", p->commands[i]->name, pid);
1519         }
1520
1521         if (p->ncommands == 0)
1522                 p->outfd = last_input;
1523 }
1524
1525 int pipeline_wait_all (pipeline *p, int **statuses, int *n_statuses)
1526 {
1527         int ret = 0;
1528         int proc_count = p->ncommands;
1529         int i;
1530         int raise_signal = 0;
1531
1532         init_debug ();
1533         if (debug_level) {
1534                 debug ("Waiting for pipeline: ");
1535                 pipeline_dump (p, stderr);
1536         }
1537
1538         assert (p->pids);       /* pipeline started */
1539         assert (p->statuses);
1540
1541         if (p->infile) {
1542                 if (fclose (p->infile))
1543                         error (0, errno,
1544                                "closing pipeline input stream failed");
1545                 p->infile = NULL;
1546                 p->infd = -1;
1547         } else if (p->infd != -1) {
1548                 if (close (p->infd))
1549                         error (0, errno, "closing pipeline input failed");
1550                 p->infd = -1;
1551         }
1552
1553         if (p->outfile) {
1554                 if (fclose (p->outfile)) {
1555                         error (0, errno,
1556                                "closing pipeline output stream failed");
1557                         ret = 127;
1558                 }
1559                 p->outfile = NULL;
1560                 p->outfd = -1;
1561         } else if (p->outfd != -1) {
1562                 if (close (p->outfd)) {
1563                         error (0, errno, "closing pipeline output failed");
1564                         ret = 127;
1565                 }
1566                 p->outfd = -1;
1567         }
1568
1569         /* Tell the SIGCHLD handler not to get in our way. */
1570         queue_sigchld = 1;
1571
1572         while (proc_count > 0) {
1573                 int r;
1574
1575                 debug ("Active processes (%d):\n", proc_count);
1576
1577                 /* Check for any statuses already collected by SIGCHLD
1578                  * handlers or the previous iteration before calling
1579                  * reap_children() again.
1580                  */
1581                 for (i = 0; i < p->ncommands; ++i) {
1582                         int status;
1583
1584                         if (p->pids[i] == -1)
1585                                 continue;
1586
1587                         debug ("  \"%s\" (%d) -> %d\n",
1588                                p->commands[i]->name, p->pids[i],
1589                                p->statuses[i]);
1590
1591                         if (p->statuses[i] == -1)
1592                                 continue;
1593
1594                         status = p->statuses[i];
1595                         p->pids[i] = -1;
1596                         --proc_count;
1597                         if (WIFSIGNALED (status)) {
1598                                 int sig = WTERMSIG (status);
1599 #ifdef SIGPIPE
1600                                 if (sig == SIGPIPE)
1601                                         status = 0;
1602                                 else {
1603 #endif /* SIGPIPE */
1604                                         /* signals currently blocked,
1605                                          * re-raise later
1606                                          */
1607                                         if (sig == SIGINT || sig == SIGQUIT)
1608                                                 raise_signal = sig;
1609                                         else if (WCOREDUMP (status))
1610                                                 error (0, 0,
1611                                                        "%s: %s (core dumped)",
1612                                                        p->commands[i]->name,
1613                                                        strsignal (sig));
1614                                         else
1615                                                 error (0, 0, "%s: %s",
1616                                                        p->commands[i]->name,
1617                                                        strsignal (sig));
1618 #ifdef SIGPIPE
1619                                 }
1620 #endif /* SIGPIPE */
1621                         } else if (!WIFEXITED (status))
1622                                 error (0, 0, "unexpected status %d",
1623                                        status);
1624
1625                         if (p->commands[i]->tag == PIPECMD_FUNCTION) {
1626                                 struct pipecmd_function *cmdf =
1627                                         &p->commands[i]->u.function;
1628                                 if (cmdf->free_func)
1629                                         (*cmdf->free_func) (cmdf->data);
1630                         }
1631
1632                         if (i == p->ncommands - 1) {
1633                                 if (WIFSIGNALED (status))
1634                                         ret = 128 + WTERMSIG (status);
1635                                 else if (WEXITSTATUS (status))
1636                                         ret = WEXITSTATUS (status);
1637                         } else if (!ret &&
1638                                    (WIFSIGNALED (status) ||
1639                                     WEXITSTATUS (status)))
1640                                 ret = 127;
1641                 }
1642
1643                 assert (proc_count >= 0);
1644                 if (proc_count == 0)
1645                         break;
1646
1647                 errno = 0;
1648                 r = reap_children (1);
1649
1650                 if (r == -1 && errno == ECHILD)
1651                         /* Eh? The pipeline was allegedly still running, so
1652                          * we shouldn't have got ECHILD.
1653                          */
1654                         error (FATAL, errno, "waitpid failed");
1655         }
1656
1657         queue_sigchld = 0;
1658
1659         for (i = 0; i < n_active_pipelines; ++i)
1660                 if (active_pipelines[i] == p)
1661                         active_pipelines[i] = NULL;
1662
1663         if (statuses && n_statuses) {
1664                 *statuses = xnmalloc (p->ncommands, sizeof **statuses);
1665                 *n_statuses = p->ncommands;
1666                 for (i = 0; i < p->ncommands; ++i)
1667                         (*statuses)[i] = p->statuses[i];
1668         }
1669
1670         free (p->pids);
1671         p->pids = NULL;
1672         free (p->statuses);
1673         p->statuses = NULL;
1674
1675         if (p->ignore_signals && !--ignored_signals) {
1676                 /* Restore signals. */
1677                 sigaction (SIGINT, &osa_sigint, NULL);
1678                 sigaction (SIGQUIT, &osa_sigquit, NULL);
1679         }
1680
1681         if (raise_signal)
1682                 raise (raise_signal);
1683
1684         return ret;
1685 }
1686
1687 int pipeline_wait (pipeline *p)
1688 {
1689         return pipeline_wait_all (p, NULL, NULL);
1690 }
1691
1692 int pipeline_run (pipeline *p)
1693 {
1694         int status;
1695
1696         pipeline_start (p);
1697         status = pipeline_wait (p);
1698         pipeline_free (p);
1699
1700         return status;
1701 }
1702
1703 void pipeline_pump (pipeline *p, ...)
1704 {
1705         va_list argv;
1706         int argc, i, j;
1707         pipeline *arg, **pieces;
1708         size_t *pos;
1709         int *known_source, *blocking_in, *blocking_out,
1710             *dying_source, *waiting, *write_error;
1711         struct sigaction sa, osa_sigpipe;
1712
1713         /* Count pipelines and allocate space for arrays. */
1714         va_start (argv, p);
1715         argc = 0;
1716         for (arg = p; arg; arg = va_arg (argv, pipeline *))
1717                 ++argc;
1718         va_end (argv);
1719         pieces = xnmalloc (argc, sizeof *pieces);
1720         pos = xnmalloc (argc, sizeof *pos);
1721         known_source = xcalloc (argc, sizeof *known_source);
1722         blocking_in = xcalloc (argc, sizeof *blocking_in);
1723         blocking_out = xcalloc (argc, sizeof *blocking_out);
1724         dying_source = xcalloc (argc, sizeof *dying_source);
1725         waiting = xcalloc (argc, sizeof *waiting);
1726         write_error = xcalloc (argc, sizeof *write_error);
1727
1728         /* Set up arrays of pipelines and their read positions. Start all
1729          * pipelines if necessary.
1730          */
1731         va_start (argv, p);
1732         for (arg = p, i = 0; i < argc; arg = va_arg (argv, pipeline *), ++i) {
1733                 pieces[i] = arg;
1734                 pos[i] = 0;
1735                 if (!pieces[i]->pids)
1736                         pipeline_start (pieces[i]);
1737         }
1738         assert (arg == NULL);
1739         va_end (argv);
1740
1741         /* All source pipelines must be supplied as arguments. */
1742         for (i = 0; i < argc; ++i) {
1743                 int found = 0;
1744                 if (!pieces[i]->source)
1745                         continue;
1746                 for (j = 0; j < argc; ++j) {
1747                         if (pieces[i]->source == pieces[j]) {
1748                                 known_source[j] = found = 1;
1749                                 break;
1750                         }
1751                 }
1752                 assert (found);
1753         }
1754
1755         for (i = 0; i < argc; ++i) {
1756                 int flags;
1757                 if (pieces[i]->infd != -1) {
1758                         flags = fcntl (pieces[i]->infd, F_GETFL);
1759                         if (!(flags & O_NONBLOCK)) {
1760                                 blocking_in[i] = 1;
1761                                 fcntl (pieces[i]->infd, F_SETFL,
1762                                        flags | O_NONBLOCK);
1763                         }
1764                 }
1765                 if (pieces[i]->outfd != -1) {
1766                         flags = fcntl (pieces[i]->outfd, F_GETFL);
1767                         if (!(flags & O_NONBLOCK)) {
1768                                 blocking_out[i] = 1;
1769                                 fcntl (pieces[i]->outfd, F_SETFL,
1770                                        flags | O_NONBLOCK);
1771                         }
1772                 }
1773         }
1774
1775 #ifdef SIGPIPE
1776         memset (&sa, 0, sizeof sa);
1777         sa.sa_handler = SIG_IGN;
1778         sigemptyset (&sa.sa_mask);
1779         sa.sa_flags = 0;
1780         sigaction (SIGPIPE, &sa, &osa_sigpipe);
1781 #endif
1782
1783 #ifdef SA_RESTART
1784         /* We rely on getting EINTR from select. */
1785         sigaction (SIGCHLD, NULL, &sa);
1786         sa.sa_flags &= ~SA_RESTART;
1787         sigaction (SIGCHLD, &sa, NULL);
1788 #endif
1789
1790         for (;;) {
1791                 fd_set rfds, wfds;
1792                 int maxfd = -1;
1793                 int ret;
1794
1795                 /* If a source dies and all data from it has been written to
1796                  * all sinks, close the writing end of the pipe to each of
1797                  * its sinks.
1798                  */
1799                 for (i = 0; i < argc; ++i) {
1800                         if (!known_source[i] || pieces[i]->outfd != -1 ||
1801                             pipeline_peek_size (pieces[i]))
1802                                 continue;
1803                         for (j = 0; j < argc; ++j) {
1804                                 if (pieces[j]->source == pieces[i] &&
1805                                     pieces[j]->infd != -1) {
1806                                         if (close (pieces[j]->infd))
1807                                                 error (0, errno,
1808                                                        "closing pipeline "
1809                                                        "input failed");
1810                                         pieces[j]->infd = -1;
1811                                 }
1812                         }
1813                 }
1814
1815                 /* If all sinks on a source have died, close the reading end
1816                  * of the pipe from that source.
1817                  */
1818                 for (i = 0; i < argc; ++i) {
1819                         int got_sink = 0;
1820                         if (!known_source[i] || pieces[i]->outfd == -1)
1821                                 continue;
1822                         for (j = 0; j < argc; ++j) {
1823                                 if (pieces[j]->source == pieces[i] &&
1824                                     pieces[j]->infd != -1) {
1825                                         got_sink = 1;
1826                                         break;
1827                                 }
1828                         }
1829                         if (got_sink)
1830                                 continue;
1831                         if (close (pieces[i]->outfd))
1832                                 error (0, errno,
1833                                        "closing pipeline output failed");
1834                         pieces[i]->outfd = -1;
1835                 }
1836
1837                 FD_ZERO (&rfds);
1838                 FD_ZERO (&wfds);
1839                 for (i = 0; i < argc; ++i) {
1840                         /* Input to sink pipeline. */
1841                         if (pieces[i]->source && pieces[i]->infd != -1 &&
1842                             !waiting[i]) {
1843                                 FD_SET (pieces[i]->infd, &wfds);
1844                                 if (pieces[i]->infd > maxfd)
1845                                         maxfd = pieces[i]->infd;
1846                         }
1847                         /* Output from source pipeline. */
1848                         if (known_source[i] && pieces[i]->outfd != -1) {
1849                                 FD_SET (pieces[i]->outfd, &rfds);
1850                                 if (pieces[i]->outfd > maxfd)
1851                                         maxfd = pieces[i]->outfd;
1852                         }
1853                 }
1854                 if (maxfd == -1)
1855                         break; /* nothing meaningful left to do */
1856
1857                 ret = select (maxfd + 1, &rfds, &wfds, NULL, NULL);
1858                 if (ret < 0 && errno == EINTR) {
1859                         /* Did a source or sink pipeline die? */
1860                         for (i = 0; i < argc; ++i) {
1861                                 if (pieces[i]->ncommands == 0)
1862                                         continue;
1863                                 if (known_source[i] && !dying_source[i] &&
1864                                     pieces[i]->outfd != -1) {
1865                                         int last = pieces[i]->ncommands - 1;
1866                                         assert (pieces[i]->statuses);
1867                                         if (pieces[i]->statuses[last] != -1) {
1868                                                 debug ("source pipeline %d "
1869                                                        "died\n", i);
1870                                                 dying_source[i] = 1;
1871                                         }
1872                                 }
1873                                 if (pieces[i]->source &&
1874                                     pieces[i]->infd != -1) {
1875                                         assert (pieces[i]->statuses);
1876                                         if (pieces[i]->statuses[0] != -1) {
1877                                                 debug ("sink pipeline %d "
1878                                                        "died\n", i);
1879                                                 close (pieces[i]->infd);
1880                                                 pieces[i]->infd = -1;
1881                                         }
1882                                 }
1883                         }
1884                         continue;
1885                 } else if (ret < 0)
1886                         error (FATAL, errno, "select");
1887
1888                 /* Read a block of data from each available source pipeline. */
1889                 for (i = 0; i < argc; ++i) {
1890                         size_t peek_size, len;
1891
1892                         if (!known_source[i] || pieces[i]->outfd == -1)
1893                                 continue;
1894                         if (!FD_ISSET (pieces[i]->outfd, &rfds))
1895                                 continue;
1896
1897                         peek_size = pipeline_peek_size (pieces[i]);
1898                         len = peek_size + 4096;
1899                         if (!pipeline_peek (pieces[i], &len) ||
1900                             len == peek_size) {
1901                                 /* Error or end-of-file; skip this pipeline
1902                                  * from now on.
1903                                  */
1904                                 debug ("source pipeline %d returned error "
1905                                        "or EOF\n", i);
1906                                 close (pieces[i]->outfd);
1907                                 pieces[i]->outfd = -1;
1908                         } else
1909                                 /* This is rather a large hammer. Whenever
1910                                  * any data is read from any source
1911                                  * pipeline, we go through and retry all
1912                                  * sink pipelines, even if they aren't
1913                                  * receiving data from the source in
1914                                  * question. This probably results in a few
1915                                  * more passes around the select() loop, but
1916                                  * it eliminates some annoyingly fiddly
1917                                  * bookkeeping.
1918                                  */
1919                                 memset (waiting, 0, argc * sizeof *waiting);
1920                 }
1921
1922                 /* Write as much data as we can to each available sink
1923                  * pipeline.
1924                  */
1925                 for (i = 0; i < argc; ++i) {
1926                         const char *block;
1927                         size_t peek_size;
1928                         ssize_t w;
1929                         size_t minpos;
1930
1931                         if (!pieces[i]->source || pieces[i]->infd == -1)
1932                                 continue;
1933                         if (!FD_ISSET (pieces[i]->infd, &wfds))
1934                                 continue;
1935                         peek_size = pipeline_peek_size (pieces[i]->source);
1936                         if (peek_size <= pos[i]) {
1937                                 /* Disable reading until data is read from a
1938                                  * source fd or a child process exits, so
1939                                  * that we neither spin nor block if the
1940                                  * source is slow.
1941                                  */
1942                                 waiting[i] = 1;
1943                                 continue;
1944                         }
1945
1946                         /* peek a block from the source */
1947                         block = pipeline_peek (pieces[i]->source, &peek_size);
1948                         /* should all already be in the peek cache */
1949                         assert (block);
1950                         assert (peek_size);
1951
1952                         /* write as much of it as will fit to the sink */
1953                         for (;;) {
1954                                 w = safe_write (pieces[i]->infd,
1955                                                 block + pos[i],
1956                                                 peek_size - pos[i]);
1957                                 if (w >= 0)
1958                                         break;
1959                                 if (errno == EAGAIN) {
1960                                         w = 0;
1961                                         break;
1962                                 }
1963                                 /* It may be useful for other processes to
1964                                  * continue even though this one fails, so
1965                                  * don't FATAL yet.
1966                                  */
1967                                 if (errno != EPIPE)
1968                                         write_error[i] = errno;
1969                                 close (pieces[i]->infd);
1970                                 pieces[i]->infd = -1;
1971                                 goto next_sink;
1972                         }
1973                         pos[i] += w;
1974                         minpos = pos[i];
1975
1976                         /* check other sinks on the same source, and update
1977                          * the source's read position if earlier data is no
1978                          * longer needed by any sink
1979                          */
1980                         for (j = 0; j < argc; ++j) {
1981                                 if (pieces[i]->source != pieces[j]->source ||
1982                                     pieces[j]->infd == -1)
1983                                         continue;
1984                                 if (pos[j] < minpos)
1985                                         minpos = pos[j];
1986                                 /* If the source is dead and all data has
1987                                  * been written to this sink, close the
1988                                  * writing end of the pipe to the sink.
1989                                  */
1990                                 if (pieces[j]->source->outfd == -1 &&
1991                                     pos[j] >= peek_size) {
1992                                         close (pieces[j]->infd);
1993                                         pieces[j]->infd = -1;
1994                                 }
1995                         }
1996
1997                         /* If some data has been written to all sinks,
1998                          * discard it from the source's peek cache.
1999                          */
2000                         pipeline_peek_skip (pieces[i]->source, minpos);
2001                         for (j = 0; j < argc; ++j) {
2002                                 if (pieces[i]->source == pieces[j]->source)
2003                                         pos[j] -= minpos;
2004                         }
2005 next_sink:              ;
2006                 }
2007         }
2008
2009 #ifdef SA_RESTART
2010         sigaction (SIGCHLD, NULL, &sa);
2011         sa.sa_flags |= SA_RESTART;
2012         sigaction (SIGCHLD, &sa, NULL);
2013 #endif
2014
2015 #ifdef SIGPIPE
2016         sigaction (SIGPIPE, &osa_sigpipe, NULL);
2017 #endif
2018
2019         for (i = 0; i < argc; ++i) {
2020                 int flags;
2021                 if (blocking_in[i] && pieces[i]->infd != -1) {
2022                         flags = fcntl (pieces[i]->infd, F_GETFL);
2023                         fcntl (pieces[i]->infd, F_SETFL, flags & ~O_NONBLOCK);
2024                 }
2025                 if (blocking_out[i] && pieces[i]->outfd != -1) {
2026                         flags = fcntl (pieces[i]->outfd, F_GETFL);
2027                         fcntl (pieces[i]->outfd, F_SETFL, flags & ~O_NONBLOCK);
2028                 }
2029         }
2030
2031         for (i = 0; i < argc; ++i) {
2032                 if (write_error[i])
2033                         error (FATAL, write_error[i], "write to sink %d", i);
2034         }
2035
2036         free (write_error);
2037         free (waiting);
2038         free (dying_source);
2039         free (blocking_out);
2040         free (blocking_in);
2041         free (known_source);
2042         free (pieces);
2043         free (pos);
2044 }
2045
2046 /* ---------------------------------------------------------------------- */
2047
2048 /* Functions to read output from pipelines. */
2049
2050 static const char *get_block (pipeline *p, size_t *len, int peek)
2051 {
2052         size_t readstart = 0, retstart = 0;
2053         size_t space = p->bufmax;
2054         size_t toread = *len;
2055         ssize_t r;
2056
2057         if (p->buffer && p->peek_offset) {
2058                 if (p->peek_offset >= toread) {
2059                         /* We've got the whole thing in the peek cache; just
2060                          * return it.
2061                          */
2062                         const char *buffer;
2063                         assert (p->peek_offset <= p->buflen);
2064                         buffer = p->buffer + p->buflen - p->peek_offset;
2065                         if (!peek)
2066                                 p->peek_offset -= toread;
2067                         return buffer;
2068                 } else {
2069                         readstart = p->buflen;
2070                         retstart = p->buflen - p->peek_offset;
2071                         space -= p->buflen;
2072                         toread -= p->peek_offset;
2073                 }
2074         }
2075
2076         if (toread > space) {
2077                 if (p->buffer)
2078                         p->bufmax = readstart + toread;
2079                 else
2080                         p->bufmax = toread;
2081                 p->buffer = xrealloc (p->buffer, p->bufmax + 1);
2082         }
2083
2084         if (!peek)
2085                 p->peek_offset = 0;
2086
2087         assert (p->outfd != -1);
2088         r = safe_read (p->outfd, p->buffer + readstart, toread);
2089         if (r == -1)
2090                 return NULL;
2091         p->buflen = readstart + r;
2092         if (peek)
2093                 p->peek_offset += r;
2094         *len -= (toread - r);
2095
2096         return p->buffer + retstart;
2097 }
2098
2099 const char *pipeline_read (pipeline *p, size_t *len)
2100 {
2101         return get_block (p, len, 0);
2102 }
2103
2104 const char *pipeline_peek (pipeline *p, size_t *len)
2105 {
2106         return get_block (p, len, 1);
2107 }
2108
2109 size_t pipeline_peek_size (pipeline *p)
2110 {
2111         if (!p->buffer)
2112                 return 0;
2113         return p->peek_offset;
2114 }
2115
2116 void pipeline_peek_skip (pipeline *p, size_t len)
2117 {
2118         if (len > 0) {
2119                 assert (p->buffer);
2120                 assert (len <= p->peek_offset);
2121                 p->peek_offset -= len;
2122         }
2123 }
2124
2125 /* readline and peekline repeatedly peek larger and larger buffers until
2126  * they find a newline or they fail. readline then adjusts the peek offset.
2127  */
2128
2129 static const char *get_line (pipeline *p, size_t *outlen)
2130 {
2131         const size_t block = 4096;
2132         const char *buffer = NULL, *end = NULL;
2133         int i;
2134
2135         if (p->line_cache) {
2136                 free (p->line_cache);
2137                 p->line_cache = NULL;
2138         }
2139
2140         if (outlen)
2141                 *outlen = 0;
2142
2143         for (i = 0; ; ++i) {
2144                 size_t plen = block * (i + 1);
2145
2146                 buffer = get_block (p, &plen, 1);
2147                 if (!buffer || plen == 0)
2148                         return NULL;
2149
2150                 end = memchr (buffer + block * i, '\n', plen);
2151                 if (!end && plen < block * (i + 1))
2152                         /* end of file, no newline found */
2153                         end = buffer + plen - 1;
2154                 if (end)
2155                         break;
2156         }
2157
2158         if (end) {
2159                 p->line_cache = xstrndup (buffer, end - buffer + 1);
2160                 if (outlen)
2161                         *outlen = end - buffer + 1;
2162                 return p->line_cache;
2163         } else
2164                 return NULL;
2165 }
2166
2167 const char *pipeline_readline (pipeline *p)
2168 {
2169         size_t buflen;
2170         const char *buffer = get_line (p, &buflen);
2171         if (buffer)
2172                 p->peek_offset -= buflen;
2173         return buffer;
2174 }
2175
2176 const char *pipeline_peekline (pipeline *p)
2177 {
2178         return get_line (p, NULL);
2179 }