/* sort - sort lines of text (with all kinds of options).
- Copyright (C) 1988, 1991-2006 Free Software Foundation, Inc.
+ Copyright (C) 1988, 1991-2007 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
#include <getopt.h>
#include <sys/types.h>
+#include <sys/wait.h>
#include <signal.h>
#include "system.h"
#include "error.h"
+#include "findprog.h"
#include "hard-locale.h"
+#include "hash.h"
#include "inttostr.h"
#include "md5.h"
#include "physmem.h"
present. */
#ifndef SA_NOCLDSTOP
# define SA_NOCLDSTOP 0
-# define sigprocmask(How, Set, Oset) /* empty */
+/* No sigprocmask. Always 'return' zero. */
+# define sigprocmask(How, Set, Oset) (0)
# define sigset_t int
# if ! HAVE_SIGINTERRUPT
# define siginterrupt(sig, flag) /* empty */
SORT_FAILURE = 2
};
+enum
+ {
+ /* The number of times we should try to fork a compression process
+ (we retry if the fork call fails). We don't _need_ to compress
+ temp files, this is just to reduce disk access, so this number
+ can be small. */
+ MAX_FORK_TRIES_COMPRESS = 2,
+
+ /* The number of times we should try to fork a decompression process.
+ If we can't fork a decompression process, we can't sort, so this
+ number should be big. */
+ MAX_FORK_TRIES_DECOMPRESS = 8
+ };
+
/* The representation of the decimal point in the current locale. */
static int decimal_point;
/* List of key field comparisons to be tried. */
static struct keyfield *keylist;
+/* Program used to (de)compress temp files. Must accept -d. */
+static const char *compress_program;
+
static void sortlines_temp (struct line *, size_t, struct line *);
/* Report MESSAGE for FILE, then clean up and exit.
/* The set of signals that are caught. */
static sigset_t caught_signals;
+/* Critical section status. */
+struct cs_status
+{
+ bool valid;
+ sigset_t sigs;
+};
+
+/* Enter a critical section. */
+static struct cs_status
+cs_enter (void)
+{
+ struct cs_status status;
+ status.valid = (sigprocmask (SIG_BLOCK, &caught_signals, &status.sigs) == 0);
+ return status;
+}
+
+/* Leave a critical section. */
+static void
+cs_leave (struct cs_status status)
+{
+ if (status.valid)
+ {
+ /* Ignore failure when restoring the signal mask. */
+ sigprocmask (SIG_SETMASK, &status.sigs, NULL);
+ }
+}
+
/* The list of temporary files. */
struct tempnode
{
struct tempnode *volatile next;
+ pid_t pid; /* If compressed, the pid of compressor, else zero */
char name[1]; /* Actual size is 1 + file name length. */
};
static struct tempnode *volatile temphead;
static struct tempnode *volatile *temptail = &temphead;
+struct sortfile
+{
+ char *name;
+ pid_t pid; /* If compressed, the pid of compressor, else zero */
+};
+
+/* A table where we store compression process states. We clean up all
+ processes in a timely manner so as not to exhaust system resources,
+ so we store the info on whether the process is still running, or has
+ been reaped here. */
+static Hash_table *proctab;
+
+enum { INIT_PROCTAB_SIZE = 47 };
+
+enum procstate { ALIVE, ZOMBIE };
+
+/* A proctab entry. The COUNT field is there in case we fork a new
+ compression process that has the same PID as an old zombie process
+ that is still in the table (because the process to decompress the
+ temp file it was associated with hasn't started yet). */
+struct procnode
+{
+ pid_t pid;
+ enum procstate state;
+ size_t count;
+};
+
+static size_t
+proctab_hasher (const void *entry, size_t tabsize)
+{
+ const struct procnode *node = entry;
+ return node->pid % tabsize;
+}
+
+static bool
+proctab_comparator (const void *e1, const void *e2)
+{
+ const struct procnode *n1 = e1, *n2 = e2;
+ return n1->pid == n2->pid;
+}
+
+/* The total number of forked processes (compressors and decompressors)
+ that have not been reaped yet. */
+static size_t nprocs;
+
+/* The number of child processes we'll allow before we try to reap some. */
+enum { MAX_PROCS_BEFORE_REAP = 2 };
+
+/* If 0 < PID, wait for the child process with that PID to exit.
+ If PID is -1, clean up a random child process which has finished and
+ return the process ID of that child. If PID is -1 and no processes
+ have quit yet, return 0 without waiting. */
+
+static pid_t
+reap (pid_t pid)
+{
+ int status;
+ pid_t cpid = waitpid (pid, &status, pid < 0 ? WNOHANG : 0);
+
+ if (cpid < 0)
+ error (SORT_FAILURE, errno, _("waiting for %s [-d]"),
+ compress_program);
+ else if (0 < cpid)
+ {
+ if (! WIFEXITED (status) || WEXITSTATUS (status))
+ error (SORT_FAILURE, 0, _("%s [-d] terminated abnormally"),
+ compress_program);
+ --nprocs;
+ }
+
+ return cpid;
+}
+
+/* Add the PID of a running compression process to proctab, or update
+ the entry COUNT and STATE fields if it's already there. This also
+ creates the table for us the first time it's called. */
+
+static void
+register_proc (pid_t pid)
+{
+ struct procnode test, *node;
+
+ if (! proctab)
+ {
+ proctab = hash_initialize (INIT_PROCTAB_SIZE, NULL,
+ proctab_hasher,
+ proctab_comparator,
+ free);
+ if (! proctab)
+ xalloc_die ();
+ }
+
+ test.pid = pid;
+ node = hash_lookup (proctab, &test);
+ if (node)
+ {
+ node->state = ALIVE;
+ ++node->count;
+ }
+ else
+ {
+ node = xmalloc (sizeof *node);
+ node->pid = pid;
+ node->state = ALIVE;
+ node->count = 1;
+ hash_insert (proctab, node);
+ }
+}
+
+/* This is called when we reap a random process. We don't know
+ whether we have reaped a compression process or a decompression
+ process until we look in the table. If there's an ALIVE entry for
+ it, then we have reaped a compression process, so change the state
+ to ZOMBIE. Otherwise, it's a decompression processes, so ignore it. */
+
+static void
+update_proc (pid_t pid)
+{
+ struct procnode test, *node;
+
+ test.pid = pid;
+ node = hash_lookup (proctab, &test);
+ if (node)
+ node->state = ZOMBIE;
+}
+
+/* This is for when we need to wait for a compression process to exit.
+ If it has a ZOMBIE entry in the table then it's already dead and has
+ been reaped. Note that if there's an ALIVE entry for it, it still may
+ already have died and been reaped if a second process was created with
+ the same PID. This is probably exceedingly rare, but to be on the safe
+ side we will have to wait for any compression process with this PID. */
+
+static void
+wait_proc (pid_t pid)
+{
+ struct procnode test, *node;
+
+ test.pid = pid;
+ node = hash_lookup (proctab, &test);
+ if (node->state == ALIVE)
+ reap (pid);
+
+ node->state = ZOMBIE;
+ if (! --node->count)
+ {
+ hash_delete (proctab, node);
+ free (node);
+ }
+}
+
+/* Keep reaping finished children as long as there are more to reap.
+ This doesn't block waiting for any of them, it only reaps those
+ that are already dead. */
+
+static void
+reap_some (void)
+{
+ pid_t pid;
+
+ while (0 < nprocs && (pid = reap (-1)))
+ update_proc (pid);
+}
+
/* Clean up any remaining temporary files. */
static void
{
/* Clean up any remaining temporary files in a critical section so
that a signal handler does not try to clean them too. */
- sigset_t oldset;
- sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+ struct cs_status cs = cs_enter ();
cleanup ();
- sigprocmask (SIG_SETMASK, &oldset, NULL);
+ cs_leave (cs);
}
close_stdout ();
}
-/* Create a new temporary file, returning its newly allocated name.
- Store into *PFP a stream open for writing. */
+/* Create a new temporary file, returning its newly allocated tempnode.
+ Store into *PFD the file descriptor open for writing. */
-static char *
-create_temp_file (FILE **pfp)
+static struct tempnode *
+create_temp_file (int *pfd)
{
static char const slashbase[] = "/sortXXXXXX";
static size_t temp_dir_index;
- sigset_t oldset;
int fd;
int saved_errno;
char const *temp_dir = temp_dirs[temp_dir_index];
struct tempnode *node =
xmalloc (offsetof (struct tempnode, name) + len + sizeof slashbase);
char *file = node->name;
+ struct cs_status cs;
memcpy (file, temp_dir, len);
memcpy (file + len, slashbase, sizeof slashbase);
node->next = NULL;
+ node->pid = 0;
if (++temp_dir_index == temp_dir_count)
temp_dir_index = 0;
/* Create the temporary file in a critical section, to avoid races. */
- sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+ cs = cs_enter ();
fd = mkstemp (file);
if (0 <= fd)
{
temptail = &node->next;
}
saved_errno = errno;
- sigprocmask (SIG_SETMASK, &oldset, NULL);
+ cs_leave (cs);
errno = saved_errno;
- if (fd < 0 || (*pfp = fdopen (fd, "w")) == NULL)
+ if (fd < 0)
die (_("cannot create temporary file"), file);
- return file;
+ *pfd = fd;
+ return node;
}
/* Return a stream for FILE, opened with mode HOW. A null FILE means
}
static void
+dup2_or_die (int oldfd, int newfd)
+{
+ if (dup2 (oldfd, newfd) < 0)
+ error (SORT_FAILURE, errno, _("dup2 failed"));
+}
+
+/* Fork a child process for piping to and do common cleanup. The
+ TRIES parameter tells us how many times to try to fork before
+ giving up. Return the PID of the child or -1 if fork failed. */
+
+static pid_t
+pipe_fork (int pipefds[2], size_t tries)
+{
+#if HAVE_WORKING_FORK
+ struct tempnode *saved_temphead;
+ int saved_errno;
+ unsigned int wait_retry = 1;
+ pid_t pid IF_LINT (= -1);
+ struct cs_status cs;
+
+ if (pipe (pipefds) < 0)
+ return -1;
+
+ while (tries--)
+ {
+ /* This is so the child process won't delete our temp files
+ if it receives a signal before exec-ing. */
+ cs = cs_enter ();
+ saved_temphead = temphead;
+ temphead = NULL;
+
+ pid = fork ();
+ saved_errno = errno;
+ if (pid)
+ temphead = saved_temphead;
+
+ cs_leave (cs);
+ errno = saved_errno;
+
+ if (0 <= pid || errno != EAGAIN)
+ break;
+ else
+ {
+ sleep (wait_retry);
+ wait_retry *= 2;
+ reap_some ();
+ }
+ }
+
+ if (pid < 0)
+ {
+ close (pipefds[0]);
+ close (pipefds[1]);
+ }
+ else if (pid == 0)
+ {
+ close (STDIN_FILENO);
+ close (STDOUT_FILENO);
+ }
+ else
+ ++nprocs;
+
+ return pid;
+
+#else /* ! HAVE_WORKING_FORK */
+ return -1;
+#endif
+}
+
+/* Create a temporary file and start a compression program to filter output
+ to that file. Set *PFP to the file handle and if *PPID is non-NULL,
+ set it to the PID of the newly-created process. */
+
+static char *
+create_temp (FILE **pfp, pid_t *ppid)
+{
+ static bool compress_program_known;
+ int tempfd;
+ struct tempnode *node = create_temp_file (&tempfd);
+ char *name = node->name;
+
+ if (! compress_program_known)
+ {
+ compress_program = getenv ("GNUSORT_COMPRESSOR");
+ if (compress_program == NULL)
+ {
+ static const char *default_program = "gzip";
+ const char *path_program = find_in_path (default_program);
+
+ if (path_program != default_program)
+ {
+ if (access (path_program, X_OK) == 0)
+ compress_program = path_program;
+ else
+ free ((char *) path_program);
+ }
+ }
+ else if (*compress_program == '\0')
+ compress_program = NULL;
+
+ compress_program_known = true;
+ }
+
+ if (compress_program)
+ {
+ int pipefds[2];
+
+ node->pid = pipe_fork (pipefds, MAX_FORK_TRIES_COMPRESS);
+ if (0 < node->pid)
+ {
+ close (tempfd);
+ close (pipefds[0]);
+ tempfd = pipefds[1];
+
+ register_proc (node->pid);
+ }
+ else if (node->pid == 0)
+ {
+ close (pipefds[1]);
+ dup2_or_die (tempfd, STDOUT_FILENO);
+ close (tempfd);
+ dup2_or_die (pipefds[0], STDIN_FILENO);
+ close (pipefds[0]);
+
+ if (execlp (compress_program, compress_program,
+ (char *) NULL) < 0)
+ error (SORT_FAILURE, errno, _("couldn't execute %s"),
+ compress_program);
+ }
+ else
+ node->pid = 0;
+ }
+
+ *pfp = fdopen (tempfd, "w");
+ if (! *pfp)
+ die (_("couldn't create temporary file"), name);
+
+ if (ppid)
+ *ppid = node->pid;
+
+ return name;
+}
+
+/* Open a compressed temp file and start a decompression process through
+ which to filter the input. PID must be the valid processes ID of the
+ process used to compress the file. */
+
+static FILE *
+open_temp (const char *name, pid_t pid)
+{
+ int tempfd, pipefds[2];
+ pid_t child_pid;
+ FILE *fp;
+
+ wait_proc (pid);
+
+ tempfd = open (name, O_RDONLY);
+ if (tempfd < 0)
+ die (_("couldn't open temporary file"), name);
+
+ child_pid = pipe_fork (pipefds, MAX_FORK_TRIES_DECOMPRESS);
+ if (0 < child_pid)
+ {
+ close (tempfd);
+ close (pipefds[1]);
+ }
+ else if (child_pid == 0)
+ {
+ close (pipefds[0]);
+ dup2_or_die (tempfd, STDIN_FILENO);
+ close (tempfd);
+ dup2_or_die (pipefds[1], STDOUT_FILENO);
+ close (pipefds[1]);
+
+ if (execlp (compress_program, compress_program,
+ "-d", (char *) NULL) < 0)
+ error (SORT_FAILURE, errno, _("couldn't execute %s -d"),
+ compress_program);
+ }
+ else
+ error (SORT_FAILURE, errno, _("couldn't create process for %s -d"),
+ compress_program);
+
+ fp = fdopen (pipefds[0], "r");
+ if (! fp)
+ die (_("couldn't create temporary file"), name);
+
+ return fp;
+}
+
+static void
write_bytes (const char *buf, size_t n_bytes, FILE *fp, const char *output_file)
{
if (fwrite (buf, 1, n_bytes, fp) != n_bytes)
struct tempnode *volatile *pnode;
struct tempnode *node;
struct tempnode *next;
- sigset_t oldset;
int unlink_status;
int unlink_errno = 0;
+ struct cs_status cs;
for (pnode = &temphead; (node = *pnode)->name != name; pnode = &node->next)
continue;
/* Unlink the temporary file in a critical section to avoid races. */
next = node->next;
- sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+ cs = cs_enter ();
unlink_status = unlink (name);
unlink_errno = errno;
*pnode = next;
- sigprocmask (SIG_SETMASK, &oldset, NULL);
+ cs_leave (cs);
if (unlink_status != 0)
error (0, unlink_errno, _("warning: cannot remove: %s"), name);
file has not been opened yet (or written to, if standard output). */
static void
-mergefps (char **files, size_t ntemps, size_t nfiles,
+mergefps (struct sortfile *files, size_t ntemps, size_t nfiles,
FILE *ofp, char const *output_file)
{
FILE *fps[NMERGE]; /* Input streams for each file. */
/* Read initial lines from each input file. */
for (i = 0; i < nfiles; )
{
- fps[i] = xfopen (files[i], "r");
+ fps[i] = (files[i].pid
+ ? open_temp (files[i].name, files[i].pid)
+ : xfopen (files[i].name, "r"));
initbuf (&buffer[i], sizeof (struct line),
MAX (merge_buffer_size, sort_size / nfiles));
- if (fillbuf (&buffer[i], fps[i], files[i]))
+ if (fillbuf (&buffer[i], fps[i], files[i].name))
{
struct line const *linelim = buffer_linelim (&buffer[i]);
cur[i] = linelim - 1;
else
{
/* fps[i] is empty; eliminate it from future consideration. */
- xfclose (fps[i], files[i]);
+ xfclose (fps[i], files[i].name);
if (i < ntemps)
{
ntemps--;
- zaptemp (files[i]);
+ zaptemp (files[i].name);
}
free (buffer[i].buf);
--nfiles;
cur[ord[0]] = smallest - 1;
else
{
- if (fillbuf (&buffer[ord[0]], fps[ord[0]], files[ord[0]]))
+ if (fillbuf (&buffer[ord[0]], fps[ord[0]], files[ord[0]].name))
{
struct line const *linelim = buffer_linelim (&buffer[ord[0]]);
cur[ord[0]] = linelim - 1;
if (ord[i] > ord[0])
--ord[i];
--nfiles;
- xfclose (fps[ord[0]], files[ord[0]]);
+ xfclose (fps[ord[0]], files[ord[0]].name);
if (ord[0] < ntemps)
{
ntemps--;
- zaptemp (files[ord[0]]);
+ zaptemp (files[ord[0]].name);
}
free (buffer[ord[0]].buf);
for (i = ord[0]; i < nfiles; ++i)
ord[j] = ord[j + 1];
ord[count_of_smaller_lines] = ord0;
}
+
+ /* Free up some resources every once in a while. */
+ if (MAX_PROCS_BEFORE_REAP < nprocs)
+ reap_some ();
}
if (unique && savedline)
common cases. */
static size_t
-avoid_trashing_input (char **files, size_t ntemps, size_t nfiles,
- char const *outfile)
+avoid_trashing_input (struct sortfile *files, size_t ntemps,
+ size_t nfiles, char const *outfile)
{
size_t i;
bool got_outstat = false;
for (i = ntemps; i < nfiles; i++)
{
- bool is_stdin = STREQ (files[i], "-");
+ bool is_stdin = STREQ (files[i].name, "-");
bool same;
struct stat instat;
- if (outfile && STREQ (outfile, files[i]) && !is_stdin)
+ if (outfile && STREQ (outfile, files[i].name) && !is_stdin)
same = true;
else
{
same = (((is_stdin
? fstat (STDIN_FILENO, &instat)
- : stat (files[i], &instat))
+ : stat (files[i].name, &instat))
== 0)
&& SAME_INODE (instat, outstat));
}
if (same)
{
FILE *tftp;
- char *temp = create_temp_file (&tftp);
- mergefps (&files[i], 0, nfiles - i, tftp, temp);
- files[i] = temp;
+ pid_t pid;
+ char *temp = create_temp (&tftp, &pid);
+ mergefps (&files[i],0, nfiles - i, tftp, temp);
+ files[i].name = temp;
+ files[i].pid = pid;
return i + 1;
}
}
OUTPUT_FILE; a null OUTPUT_FILE stands for standard output. */
static void
-merge (char **files, size_t ntemps, size_t nfiles, char const *output_file)
+merge (struct sortfile *files, size_t ntemps, size_t nfiles,
+ char const *output_file)
{
while (NMERGE < nfiles)
{
for (out = in = 0; out < nfiles / NMERGE; out++, in += NMERGE)
{
FILE *tfp;
- char *temp = create_temp_file (&tfp);
+ pid_t pid;
+ char *temp = create_temp (&tfp, &pid);
size_t nt = MIN (ntemps, NMERGE);
ntemps -= nt;
mergefps (&files[in], nt, NMERGE, tfp, temp);
- files[out] = temp;
+ files[out].name = temp;
+ files[out].pid = pid;
}
remainder = nfiles - in;
files as possible, to avoid needless I/O. */
size_t nshortmerge = remainder - cheap_slots + 1;
FILE *tfp;
- char *temp = create_temp_file (&tfp);
+ pid_t pid;
+ char *temp = create_temp (&tfp, &pid);
size_t nt = MIN (ntemps, nshortmerge);
ntemps -= nt;
mergefps (&files[in], nt, nshortmerge, tfp, temp);
- files[out++] = temp;
+ files[out].name = temp;
+ files[out++].pid = pid;
in += nshortmerge;
}
else
{
++ntemps;
- temp_output = create_temp_file (&tfp);
+ temp_output = create_temp (&tfp, NULL);
}
do
xfclose (tfp, temp_output);
+ /* Free up some resources every once in a while. */
+ if (MAX_PROCS_BEFORE_REAP < nprocs)
+ reap_some ();
+
if (output_file_created)
goto finish;
}
{
size_t i;
struct tempnode *node = temphead;
- char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles);
+ struct sortfile *tempfiles = xnmalloc (ntemps, sizeof *tempfiles);
for (i = 0; node; i++)
{
- tempfiles[i] = node->name;
+ tempfiles[i].name = node->name;
+ tempfiles[i].pid = node->pid;
node = node->next;
}
merge (tempfiles, ntemps, ntemps, output_file);
}
if (mergeonly)
- merge (files, 0, nfiles, outfile);
+ {
+ struct sortfile *sortfiles = xcalloc (nfiles, sizeof *sortfiles);
+ size_t i;
+
+ for (i = 0; i < nfiles; ++i)
+ sortfiles[i].name = files[i];
+
+ merge (sortfiles, 0, nfiles, outfile);
+ }
else
sort (files, nfiles, outfile);