sort: parallelize internal sort
authorChen Guo <chenguo4@yahoo.com>
Fri, 9 Jul 2010 07:03:50 +0000 (08:03 +0100)
committerPádraig Brady <P@draigBrady.com>
Tue, 13 Jul 2010 00:44:46 +0000 (01:44 +0100)
This patch is by Gene Auyeung, Chris Dickens, Chen Guo, and Mike
Nichols, based off of a patch by Paul Eggert, Glen Lenker, et. al.,
with a basic heap implementation based off of the GDSL heap,
originally by Nicolas Darnis.

The number of sorts done in parallel is limited to the number
of available processors by default, or can be further restricted
with the --parallel option.

On a dual-die, 8 core Intel Xeon, results show sorting with
8 threads is almost 4 times faster than using a single thread.
Timings when sorting a 96MB file:
THREADS     TIME (s)
1            5.10
2            2.87
4            1.75
8            1.31

Single threaded sorting has also been improved,
especially for cheaper comparison operations:
COMMAND             BEFORE (s)  AFTER (s)
sort                 8.822       8.716
sort -g             10.336      10.222
sort -n              3.077       2.961
LANG=C sort          2.169       2.066

* bootstrap.conf: Add heap, pthread.
* coreutils.texi (sort): Describe the new --parallel option.
* gl/lib/heap.c: New file. Very basic heap implementation.
* gl/lib/heap.h: New file.
* gl/modules/heap: New file.
* src/Makefile.am: Add LIB_PTHREAD.
* src/sort.c: Include heap.h, nproc.h, pthread.h.
(MAX_MERGE): New macro.
(SUBTHREAD_LINES_HEURISTIC, PARALLEL_OPTION): New constants.
(MERGE_END, MERGE_ROOT): New constants.
(struct merge_node): New struct.
(struct merge_node_queue): New struct.
(sortlines temp): Remove declaration.
(usage, long_options, main): New option, --parallel.
(specify_nthreads): New function.
(mergelines): New signature, to emphasize the fact that the HI area
must be part of the destination.  All callers changed.
(sequential_sort): New function, renamed from sortlines. Merge in
the functionality of sortlines_temp.
(compare_nodes): New function.
(lock_node, unlock_node): New functions.
(queue_destroy): New function.
(queue_init): New function.
(queue_insert): New function.
(queue_pop): New function.
(write_unique): New function.
(mergelines_node): New function.
(check_insert): New function.
(update_parent): New function.
(merge_loop): New function.
(sortlines): Rewrite to support and use parallelism, with a new
signature. All callers changed.
(struct thread_args): New struct.
(sortlines_thread): New function.
(sortlines_temp): Remove.
(sort): New argument NTHREADS. All uses changed. Output moved to
mergelines_node.
(main): disable threading if we are sorting at random.
* tests/Makefile.am (TESTS): Add misc/sort-benchmark-random.
* tests/misc/sort-benchmark-random: New file.

Signed-off-by: Pádraig Brady <P@draigBrady.com>
NEWS
bootstrap.conf
doc/coreutils.texi
gl/lib/heap.c [new file with mode: 0644]
gl/lib/heap.h [new file with mode: 0644]
gl/modules/heap [new file with mode: 0644]
src/Makefile.am
src/sort.c
tests/Makefile.am
tests/misc/sort-benchmark-random [new file with mode: 0755]

diff --git a/NEWS b/NEWS
index 82190d9..6ed23fb 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -30,6 +30,11 @@ GNU coreutils NEWS                                    -*- outline -*-
 
   sort -g now uses long doubles for greater range and precision.
 
+  sort now uses the number of available processors to parallelize
+  the sorting operation.  The number of sorts run concurrently can be
+  limited with the --parallel option or with external process
+  control like taskset for example.
+
   stat no longer accepts the --context (-Z) option.  Initially it was
   merely accepted and ignored, for compatibility.  Starting two years
   ago, with coreutils-7.0, its use evoked a warning.
index 644c18b..fe3974a 100644 (file)
@@ -124,6 +124,7 @@ gnulib_modules="
   hard-locale
   hash
   hash-pjw
+  heap
   host-os
   human
   idcache
@@ -173,6 +174,7 @@ gnulib_modules="
   priv-set
   progname
   propername
+  pthread
   putenv
   quote
   quotearg
index 21cf36d..942978f 100644 (file)
@@ -4068,6 +4068,14 @@ have a large sort or merge that is I/O-bound, you can often improve
 performance by using this option to specify directories on different
 disks and controllers.
 
+@item --parallel=@var{n}
+@opindex --parallel
+@cindex multithreaded sort
+Limit the number of sorts run in parallel to @var{n}. By default,
+@var{n} is set to the number of available processors, and values
+greater than that are reduced to that limit. Also see
+@ref{nproc invocation}.
+
 @item -u
 @itemx --unique
 @opindex -u
@@ -4163,6 +4171,13 @@ sort -n -r
 @end example
 
 @item
+Run no more that 4 sorts concurrently, using a buffer size of 10M.
+
+@example
+sort --parallel=4 -S 10M
+@end example
+
+@item
 Sort alphabetically, omitting the first and second fields
 and the blanks at the start of the third field.
 This uses a single key composed of the characters beginning
diff --git a/gl/lib/heap.c b/gl/lib/heap.c
new file mode 100644 (file)
index 0000000..a37224f
--- /dev/null
@@ -0,0 +1,159 @@
+/* Barebones heap implementation supporting only insert and pop.
+
+   Copyright (C) 2010 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
+
+/* Full implementation: GDSL (http://gna.org/projects/gdsl/) by Nicolas
+   Darnis <ndarnis@free.fr>. */
+
+#include <config.h>
+
+#include "heap.h"
+#include "stdlib--.h"
+#include "xalloc.h"
+
+static int heap_default_compare (const void *, const void *);
+static size_t heapify_down (void **, size_t, size_t,
+                            int (*)(const void *, const void *));
+static void heapify_up (void **, size_t,
+                        int (*)(const void *, const void *));
+
+
+/* Allocate memory for the heap. */
+
+struct heap *
+heap_alloc (int (*compare)(const void *, const void *), size_t n_reserve)
+{
+  struct heap *heap;
+  void *xmalloc_ret = xmalloc (sizeof *heap);
+  heap = (struct heap *) xmalloc_ret;
+  if (!heap)
+    return NULL;
+
+  if (n_reserve <= 0)
+    n_reserve = 1;
+
+  xmalloc_ret = xmalloc (n_reserve * sizeof *(heap->array));
+  heap->array = (void **) xmalloc_ret;
+  if (!heap->array)
+    {
+      free (heap);
+      return NULL;
+    }
+
+  heap->array[0] = NULL;
+  heap->capacity = n_reserve;
+  heap->count = 0;
+  heap->compare = compare ? compare : heap_default_compare;
+
+  return heap;
+}
+
+
+static int
+heap_default_compare (const void *a, const void *b)
+{
+  return 0;
+}
+
+
+void
+heap_free (struct heap *heap)
+{
+  free (heap->array);
+  free (heap);
+}
+
+/* Insert element into heap. */
+
+int
+heap_insert (struct heap *heap, void *item)
+{
+  if (heap->capacity - 1 <= heap->count)
+    {
+      size_t new_size = (2 + heap->count) * sizeof *(heap->array);
+      void *realloc_ret = xrealloc (heap->array, new_size);
+      heap->array = (void **) realloc_ret;
+      heap->capacity = (2 + heap->count);
+
+      if (!heap->array)
+        return -1;
+    }
+
+  heap->array[++heap->count] = item;
+  heapify_up (heap->array, heap->count, heap->compare);
+
+  return 0;
+}
+
+/* Pop top element off heap. */
+
+void *
+heap_remove_top (struct heap *heap)
+{
+  if (heap->count == 0)
+    return NULL;
+
+  void *top = heap->array[1];
+  heap->array[1] = heap->array[heap->count--];
+  heapify_down (heap->array, heap->count, 1, heap->compare);
+
+  return top;
+}
+
+/* Move element down into appropriate position in heap. */
+
+static size_t
+heapify_down (void **array, size_t count, size_t initial,
+              int (*compare)(const void *, const void *))
+{
+  void *element = array[initial];
+
+  size_t parent = initial;
+  while (parent <= count / 2)
+    {
+      size_t child = 2 * parent;
+
+      if (child < count && compare (array[child], array[child+1]) < 0)
+        child++;
+
+      if (compare (array[child], element) <= 0)
+        break;
+
+      array[parent] = array[child];
+      parent = child;
+    }
+
+  array[parent] = element;
+  return parent;
+}
+
+/* Move element up into appropriate position in heap. */
+
+static void
+heapify_up (void **array, size_t count,
+            int (*compare)(const void *, const void *))
+{
+  size_t k = count;
+  void *new_element = array[k];
+
+  while (k != 1 && compare (array[k/2], new_element) <= 0)
+    {
+      array[k] = array[k/2];
+      k /= 2;
+    }
+
+  array[k] = new_element;
+}
diff --git a/gl/lib/heap.h b/gl/lib/heap.h
new file mode 100644 (file)
index 0000000..0ea516a
--- /dev/null
@@ -0,0 +1,34 @@
+/* Barebones heap implementation supporting only insert and pop.
+
+   Copyright (C) 2010 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
+
+/* Full implementation: GDSL (http://gna.org/projects/gdsl/) by Nicolas
+   Darnis <ndarnis@free.fr>. Adapted by Gene Auyeung. */
+
+#include <stddef.h>
+
+struct heap
+{
+  void **array;     /* array[0] is not used */
+  size_t capacity;  /* Array size */
+  size_t count;     /* Used as index to last element. Also is num of items. */
+  int (*compare)(const void *, const void *);
+};
+
+struct heap *heap_alloc (int (*)(const void *, const void *), size_t);
+void heap_free (struct heap *);
+int heap_insert (struct heap *heap, void *item);
+void *heap_remove_top (struct heap *heap);
diff --git a/gl/modules/heap b/gl/modules/heap
new file mode 100644 (file)
index 0000000..cd97e29
--- /dev/null
@@ -0,0 +1,24 @@
+Description:
+Binary heap with minimal number of methods. Used in sort.
+
+Files:
+lib/heap.c
+lib/heap.h
+
+Depends-on:
+stdlib-safer
+xalloc
+
+configure.ac:
+
+Makefile.am:
+lib_SOURCES += heap.c heap.h
+
+Include:
+"heap.h"
+
+License
+GPL
+
+Maintainer:
+Gene Auyeung
index 0630a06..2e0e320 100644 (file)
@@ -397,6 +397,9 @@ uname_LDADD += $(GETHOSTNAME_LIB)
 # for strsignal
 kill_LDADD += $(LIBTHREAD)
 
+# for pthread
+sort_LDADD += $(LIB_PTHREAD)
+
 $(PROGRAMS): ../lib/libcoreutils.a
 
 # Get the release year from ../lib/version-etc.c.
index ff8a97a..5ea1b34 100644 (file)
@@ -23,6 +23,7 @@
 #include <config.h>
 
 #include <getopt.h>
+#include <pthread.h>
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <signal.h>
 #include "filevercmp.h"
 #include "hard-locale.h"
 #include "hash.h"
+#include "heap.h"
 #include "ignore-value.h"
 #include "md5.h"
 #include "mbswidth.h"
+#include "nproc.h"
 #include "physmem.h"
 #include "posixver.h"
 #include "quote.h"
@@ -93,6 +96,18 @@ struct rlimit { size_t rlim_cur; };
 # define DEFAULT_TMPDIR "/tmp"
 #endif
 
+/* Maximum number of lines to merge every time a NODE is taken from
+   the MERGE_QUEUE.  Node is at LEVEL in the binary merge tree,
+   and is responsible for merging TOTAL lines. */
+#define MAX_MERGE(total, level) ((total) / ((2 << level) * (2 << level)) + 1)
+
+/* Heuristic value for the number of lines for which it is worth
+   creating a subthread, during an internal merge sort, on a machine
+   that has processors galore.  Currently this number is just a guess.
+   This value must be at least 4.  We don't know of any machine where
+   this number has any practical effect.  */
+enum { SUBTHREAD_LINES_HEURISTIC = 4 };
+
 /* Exit statuses.  */
 enum
   {
@@ -119,6 +134,15 @@ enum
     MAX_FORK_TRIES_DECOMPRESS = 9
   };
 
+enum
+  {
+    /* Level of the end-of-merge node, one level above the root. */
+    MERGE_END = 0,
+
+    /* Level of the root node in merge tree. */
+    MERGE_ROOT = 1
+  };
+
 /* The representation of the decimal point in the current locale.  */
 static int decimal_point;
 
@@ -196,6 +220,31 @@ struct month
   int val;
 };
 
+/* Binary merge tree node. */
+struct merge_node
+{
+  struct line *lo;              /* Lines to merge from LO child node. */
+  struct line *hi;              /* Lines to merge from HI child ndoe. */
+  struct line *end_lo;          /* End of available lines from LO. */
+  struct line *end_hi;          /* End of available lines from HI. */
+  struct line **dest;           /* Pointer to destination of merge. */
+  size_t nlo;                   /* Total Lines remaining from LO. */
+  size_t nhi;                   /* Total lines remaining from HI. */
+  size_t level;                 /* Level in merge tree. */
+  struct merge_node *parent;    /* Parent node. */
+  bool queued;                  /* Node is already in heap. */
+  pthread_spinlock_t *lock;     /* Lock for node operations. */
+};
+
+/* Priority queue of merge nodes. */
+struct merge_node_queue
+{
+  struct heap *priority_queue;  /* Priority queue of merge tree nodes. */
+  pthread_mutex_t mutex;        /* Lock for queue operations. */
+  pthread_cond_t cond;          /* Conditional wait for empty queue to populate
+                                   when popping. */
+};
+
 /* FIXME: None of these tables work with multibyte character sets.
    Also, there are many other bugs when handling multibyte characters.
    One way to fix this is to rewrite `sort' to use wide characters
@@ -300,8 +349,6 @@ static bool debug;
    number are present, temp files will be used. */
 static unsigned int nmerge = NMERGE_DEFAULT;
 
-static void sortlines_temp (struct line *, size_t, struct line *);
-
 /* Report MESSAGE for FILE, then clean up and exit.
    If FILE is null, it represents standard output.  */
 
@@ -398,6 +445,7 @@ Other options:\n\
   -t, --field-separator=SEP  use SEP instead of non-blank to blank transition\n\
   -T, --temporary-directory=DIR  use DIR for temporaries, not $TMPDIR or %s;\n\
                               multiple options specify multiple directories\n\
+      --parallel=N          limit the number of sorts run concurrently to N\n\
   -u, --unique              with -c, check for strict ordering;\n\
                               without -c, output only the first of an equal run\n\
 "), DEFAULT_TMPDIR);
@@ -442,7 +490,8 @@ enum
   FILES0_FROM_OPTION,
   NMERGE_OPTION,
   RANDOM_SOURCE_OPTION,
-  SORT_OPTION
+  SORT_OPTION,
+  PARALLEL_OPTION
 };
 
 static char const short_options[] = "-bcCdfghik:mMno:rRsS:t:T:uVy:z";
@@ -476,6 +525,7 @@ static struct option const long_options[] =
   {"temporary-directory", required_argument, NULL, 'T'},
   {"unique", no_argument, NULL, 'u'},
   {"zero-terminated", no_argument, NULL, 'z'},
+  {"parallel", required_argument, NULL, PARALLEL_OPTION},
   {GETOPT_HELP_OPTION_DECL},
   {GETOPT_VERSION_OPTION_DECL},
   {NULL, 0, NULL, 0},
@@ -1333,6 +1383,22 @@ specify_sort_size (int oi, char c, char const *s)
   xstrtol_fatal (e, oi, c, long_options, s);
 }
 
+/* Specify the number of threads to spawn during internal sort.  */
+static unsigned long int
+specify_nthreads (int oi, char c, char const *s)
+{
+  unsigned long int nthreads;
+  enum strtol_error e = xstrtoul (s, NULL, 10, &nthreads, "");
+  if (e == LONGINT_OVERFLOW)
+    return ULONG_MAX;
+  if (e != LONGINT_OK)
+    xstrtol_fatal (e, oi, c, long_options, s);
+  if (nthreads == 0)
+    error (SORT_FAILURE, 0, _("number in parallel must be nonzero"));
+  return nthreads;
+}
+
+
 /* Return the default sort size.  */
 static size_t
 default_sort_size (void)
@@ -2951,25 +3017,28 @@ mergefiles (struct sortfile *files, size_t ntemps, size_t nfiles,
   return nopened;
 }
 
-/* Merge into T the two sorted arrays of lines LO (with NLO members)
-   and HI (with NHI members).  T, LO, and HI point just past their
-   respective arrays, and the arrays are in reverse order.  NLO and
-   NHI must be positive, and HI - NHI must equal T - (NLO + NHI).  */
+/* Merge into T (of size NLINES) the two sorted arrays of lines
+   LO (with NLINES / 2 members), and
+   T - (NLINES / 2) (with NLINES - NLINES / 2 members).
+   T and LO point just past their respective arrays, and the arrays
+   are in reverse order.  NLINES must be at least 2.  */
 
 static inline void
-mergelines (struct line *t,
-            struct line const *lo, size_t nlo,
-            struct line const *hi, size_t nhi)
+mergelines (struct line *restrict t, size_t nlines,
+            struct line const *restrict lo)
 {
+  size_t nlo = nlines / 2;
+  size_t nhi = nlines - nlo;
+  struct line *hi = t - nlo;
+
   while (true)
     if (compare (lo - 1, hi - 1, false) <= 0)
       {
         *--t = *--lo;
         if (! --nlo)
           {
-            /* HI - NHI equalled T - (NLO + NHI) when this function
-               began.  Therefore HI must equal T now, and there is no
-               need to copy from HI to T.  */
+            /* HI must equal T now, and there is no need to copy from
+               HI to T. */
             return;
           }
       }
@@ -3000,15 +3069,25 @@ mergelines (struct line *t,
    D. A. Bell, Comp J. 1 (1958), 75.  */
 
 static void
-sortlines (struct line *lines, size_t nlines, struct line *temp)
+sequential_sort (struct line *restrict lines, size_t nlines,
+                 struct line *restrict temp, bool to_temp)
 {
   if (nlines == 2)
     {
-      if (0 < compare (&lines[-1], &lines[-2], false))
+      /* Declare `swap' as int, not bool, to work around a bug
+         <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
+         in the IBM xlc 6.0.0.0 compiler in 64-bit mode.  */
+      int swap = (0 < compare (&lines[-1], &lines[-2], false));
+      if (to_temp)
         {
-          struct line tmp = lines[-1];
+          temp[-1] = lines[-1 - swap];
+          temp[-2] = lines[-2 + swap];
+        }
+      else if (swap)
+        {
+          temp[-1] = lines[-1];
           lines[-1] = lines[-2];
-          lines[-2] = tmp;
+          lines[-2] = temp[-1];
         }
     }
   else
@@ -3017,46 +3096,386 @@ sortlines (struct line *lines, size_t nlines, struct line *temp)
       size_t nhi = nlines - nlo;
       struct line *lo = lines;
       struct line *hi = lines - nlo;
-      struct line *sorted_lo = temp;
 
-      sortlines (hi, nhi, temp);
+      sequential_sort (hi, nhi, temp - (to_temp ? nlo : 0), to_temp);
       if (1 < nlo)
-        sortlines_temp (lo, nlo, sorted_lo);
+        sequential_sort (lo, nlo, temp, !to_temp);
+      else if (!to_temp)
+        temp[-1] = lo[-1];
+
+      struct line *dest;
+      struct line const *sorted_lo;
+      if (to_temp)
+        {
+          dest = temp;
+          sorted_lo = lines;
+        }
       else
-        sorted_lo[-1] = lo[-1];
+        {
+          dest = lines;
+          sorted_lo = temp;
+        }
+      mergelines (dest, nlines, sorted_lo);
+    }
+}
+
+/* Compare two NODEs for priority. The NODE with the higher (numerically
+   lower) level has priority. If tie, the NODE with the most remaining
+   lines has priority. */
+
+static int
+compare_nodes (const void *a, const void *b)
+{
+  const struct merge_node *nodea = (const struct merge_node *) a;
+  const struct merge_node *nodeb = (const struct merge_node *) b;
+  if (nodea->level == nodeb->level)
+      return (nodea->nlo + nodea->nhi) < (nodeb->nlo + nodeb->nhi);
+  return nodea->level < nodeb->level;
+}
+
+/* Lock a merge tree NODE.
+   Note spin locks were seen to perform better than mutexes
+   as long as the number of threads is limited to the
+   number of processors.  */
 
-      mergelines (lines, sorted_lo, nlo, hi, nhi);
+static inline void
+lock_node (struct merge_node *const restrict node)
+{
+  pthread_spin_lock (node->lock);
+}
+
+/* Unlock a merge tree NODE. */
+
+static inline void
+unlock_node (struct merge_node *const restrict node)
+{
+  pthread_spin_unlock (node->lock);
+}
+
+/* Destroy merge QUEUE. */
+
+static inline void
+queue_destroy (struct merge_node_queue *const restrict queue)
+{
+  heap_free (queue->priority_queue);
+  pthread_cond_destroy (&queue->cond);
+  pthread_mutex_destroy (&queue->mutex);
+}
+
+/* Initialize merge QUEUE, allocating space for a maximum of RESERVE nodes.
+   Though it's highly unlikely all nodes are in the heap at the same time,
+   RESERVE should accommodate all of them. Counting a NULL dummy head for the
+   heap, RESERVE should be 2 * NTHREADS. */
+
+static inline void
+queue_init (struct merge_node_queue *const restrict queue, size_t reserve)
+{
+  queue->priority_queue = (struct heap *) heap_alloc (compare_nodes, reserve);
+  pthread_mutex_init (&queue->mutex, NULL);
+  pthread_cond_init (&queue->cond, NULL);
+}
+
+/* Insert NODE into priority QUEUE. Assume caller either holds lock on NODE
+   or does not need to lock NODE. */
+
+static inline void
+queue_insert (struct merge_node_queue *const restrict queue,
+              struct merge_node *const restrict node)
+{
+  pthread_mutex_lock (&queue->mutex);
+  heap_insert (queue->priority_queue, node);
+  node->queued = true;
+  pthread_mutex_unlock (&queue->mutex);
+  pthread_cond_signal (&queue->cond);
+}
+
+/* Pop NODE off priority QUEUE. Guarantee a non-null, spinlocked NODE. */
+
+static inline struct merge_node *
+queue_pop (struct merge_node_queue *const restrict queue)
+{
+  struct merge_node *node = NULL;
+
+  while (!node)
+    {
+      pthread_mutex_lock (&queue->mutex);
+      if (queue->priority_queue->count)
+        node = (struct merge_node *) heap_remove_top (queue->priority_queue);
+      else
+        {
+          /* Go into conditional wait if no NODE is immediately available.  */
+          pthread_cond_wait (&queue->cond, &queue->mutex);
+        }
+      pthread_mutex_unlock (&queue->mutex);
     }
+  lock_node (node);
+  node->queued = false;
+  return node;
 }
 
-/* Like sortlines (LINES, NLINES, TEMP), except output into TEMP
-   rather than sorting in place.  */
+/* If UNQIUE is set, checks to make sure line isn't a duplicate before
+   outputting. If UNIQUE is not set, output the passed in line. Note that
+   this function does not actually save the line, nor any key information,
+   thus is only appropriate for internal sort. */
 
-static void
-sortlines_temp (struct line *lines, size_t nlines, struct line *temp)
+static inline void
+write_unique (struct line *const restrict line, FILE *tfp,
+              const char *temp_output)
 {
-  if (nlines == 2)
+  static struct line *saved = NULL;
+
+  if (!unique)
+    write_bytes (line, tfp, temp_output);
+  else if (!saved || compare (line, saved, false))
     {
-      /* Declare `swap' as int, not bool, to work around a bug
-         <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
-         in the IBM xlc 6.0.0.0 compiler in 64-bit mode.  */
-      int swap = (0 < compare (&lines[-1], &lines[-2], false));
-      temp[-1] = lines[-1 - swap];
-      temp[-2] = lines[-2 + swap];
+      saved = line;
+      write_bytes (line, tfp, temp_output);
+    }
+}
+
+/* Merge the lines currently available to a NODE in the binary
+   merge tree, up to a maximum specified by MAX_MERGE. */
+
+static inline size_t
+mergelines_node (struct merge_node *const restrict node, size_t total_lines,
+                 FILE *tfp, const char *temp_output)
+{
+  struct line *lo_orig = node->lo;
+  struct line *hi_orig = node->hi;
+  size_t to_merge = MAX_MERGE (total_lines, node->level);
+  size_t merged_lo;
+  size_t merged_hi;
+
+  if (node->level > MERGE_ROOT)
+    {
+      /* Merge to destination buffer. */
+      struct line *dest = *node->dest;
+      while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
+        if (compare (node->lo - 1, node->hi - 1, false) <= 0)
+          *--dest = *--node->lo;
+        else
+          *--dest = *--node->hi;
+
+      merged_lo = lo_orig - node->lo;
+      merged_hi = hi_orig - node->hi;
+
+      if (node->nhi == merged_hi)
+        while (node->lo != node->end_lo && to_merge--)
+          *--dest = *--node->lo;
+      else if (node->nlo == merged_lo)
+        while (node->hi != node->end_hi && to_merge--)
+          *--dest = *--node->hi;
     }
   else
     {
-      size_t nlo = nlines / 2;
-      size_t nhi = nlines - nlo;
-      struct line *lo = lines;
-      struct line *hi = lines - nlo;
-      struct line *sorted_hi = temp - nlo;
+      /* Merge directly to output. */
+      while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
+        {
+          if (compare (node->lo - 1, node->hi - 1, false) <= 0)
+            write_unique (--node->lo, tfp, temp_output);
+          else
+            write_unique (--node->hi, tfp, temp_output);
+        }
+
+      merged_lo = lo_orig - node->lo;
+      merged_hi = hi_orig - node->hi;
+
+      if (node->nhi == merged_hi)
+        {
+          while (node->lo != node->end_lo && to_merge--)
+            write_unique (--node->lo, tfp, temp_output);
+        }
+      else if (node->nlo == merged_lo)
+        {
+          while (node->hi != node->end_hi && to_merge--)
+            write_unique (--node->hi, tfp, temp_output);
+        }
+      node->dest -= lo_orig - node->lo + hi_orig - node->hi;
+    }
+
+  /* Update NODE. */
+  merged_lo = lo_orig - node->lo;
+  merged_hi = hi_orig - node->hi;
+  node->nlo -= merged_lo;
+  node->nhi -= merged_hi;
+  return merged_lo + merged_hi;
+}
+
+/* Insert NODE into QUEUE if it passes insertion checks. */
+
+static inline void
+check_insert (struct merge_node *node,
+              struct merge_node_queue *const restrict queue)
+{
+  size_t lo_avail = node->lo - node->end_lo;
+  size_t hi_avail = node->hi - node->end_hi;
+
+  /* Conditions for insertion:
+     1. NODE is not already in heap.
+     2. NODE has available lines from both it's children, OR one child has
+          available lines, but the other has exhausted all its lines. */
+  if ((!node->queued)
+      && ((lo_avail && (hi_avail || !(node->nhi)))
+          || (hi_avail && !(node->nlo))))
+    {
+      queue_insert (queue, node);
+    }
+}
+
+/* Update parent merge tree NODE. */
+
+static inline void
+update_parent (struct merge_node *const restrict node, size_t merged,
+               struct merge_node_queue *const restrict queue)
+{
+  if (node->level > MERGE_ROOT)
+    {
+      lock_node (node->parent);
+      *node->dest -= merged;
+      check_insert (node->parent, queue);
+      unlock_node (node->parent);
+    }
+  else if (node->nlo + node->nhi == 0)
+    {
+      /* If the MERGE_ROOT NODE has finished merging, insert the
+         MERGE_END node.  */
+      queue_insert (queue, node->parent);
+    }
+}
+
+/* Repeatedly pop QUEUE for a NODE with lines to merge, and merge at least
+   some of those lines, until the MERGE_END node is popped. */
+
+static void
+merge_loop (struct merge_node_queue *const restrict queue,
+            const size_t total_lines, FILE *tfp, const char *temp_output)
+{
+  while (1)
+    {
+      struct merge_node *node = queue_pop (queue);
+
+      if (node->level == MERGE_END)
+        {
+          unlock_node (node);
+          /* Reinsert so other threads can pop it. */
+          queue_insert (queue, node);
+          break;
+        }
+      size_t merged_lines = mergelines_node (node, total_lines, tfp,
+                                             temp_output);
+      check_insert (node, queue);
+      update_parent (node, merged_lines, queue);
+
+      unlock_node (node);
+    }
+}
+
+
+static void sortlines (struct line *restrict, struct line *restrict,
+                       unsigned long int, const size_t,
+                       struct merge_node *const restrict, bool,
+                       struct merge_node_queue *const restrict,
+                       FILE *, const char *);
+
+/* Thread arguments for sortlines_thread. */
+
+struct thread_args
+{
+  struct line *lines;
+  struct line *dest;
+  unsigned long int nthreads;
+  const size_t total_lines;
+  struct merge_node *const restrict parent;
+  bool lo_child;
+  struct merge_node_queue *const restrict merge_queue;
+  FILE *tfp;
+  const char *output_temp;
+};
+
+/* Like sortlines, except with a signature acceptable to pthread_create.  */
 
-      sortlines_temp (hi, nhi, sorted_hi);
+static void *
+sortlines_thread (void *data)
+{
+  struct thread_args const *args = data;
+  sortlines (args->lines, args->dest, args->nthreads, args->total_lines,
+             args->parent, args->lo_child, args->merge_queue,
+             args->tfp, args->output_temp);
+  return NULL;
+}
+
+/* There are three phases to the algorithm: node creation, sequential sort,
+   and binary merge.
+
+   During node creation, sortlines recursively visits each node in the
+   binary merge tree and creates a NODE structure corresponding to all the
+   future line merging NODE is responsible for. For each call to
+   sortlines, half the available threads are assigned to each recursive
+   call, until a leaf node having only 1 available thread is reached.
+
+   Each leaf node then performs two sequential sorts, one on each half of
+   the lines it is responsible for. It records in its NODE structure that
+   there are two sorted sublists available to merge from, and inserts its
+   NODE into the priority queue.
+
+   The binary merge phase then begins. Each thread drops into a loop
+   where the thread retrieves a NODE from the priority queue, merges lines
+   available to that NODE, and potentially insert NODE or its parent back
+   into the queue if there are sufficient available lines for them to
+   merge. This continues until all lines at all nodes of the merge tree
+   have been merged. */
+
+static void
+sortlines (struct line *restrict lines, struct line *restrict dest,
+           unsigned long int nthreads, const size_t total_lines,
+           struct merge_node *const restrict parent, bool lo_child,
+           struct merge_node_queue *const restrict merge_queue,
+           FILE *tfp, const char *temp_output)
+{
+  /* Create merge tree NODE. */
+  size_t nlines = (lo_child)? parent->nlo : parent->nhi;
+  size_t nlo = nlines / 2;
+  size_t nhi = nlines - nlo;
+  struct line *lo = dest - total_lines;
+  struct line *hi = lo - nlo;
+  struct line **parent_end = (lo_child)? &parent->end_lo : &parent->end_hi;
+  pthread_spinlock_t lock;
+  pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE);
+  struct merge_node node = {lo, hi, lo, hi, parent_end, nlo, nhi,
+                            parent->level + 1, parent, false, &lock};
+
+  /* Calculate thread arguments. */
+  unsigned long int lo_threads = nthreads / 2;
+  unsigned long int hi_threads = nthreads - lo_threads;
+  pthread_t thread;
+  struct thread_args args = {lines, lo, lo_threads, total_lines, &node,
+                             true, merge_queue, tfp, temp_output};
+
+  if (nthreads > 1 && SUBTHREAD_LINES_HEURISTIC <= nlines
+      && pthread_create (&thread, NULL, sortlines_thread, &args) == 0)
+    {
+      sortlines (lines - nlo, hi, hi_threads, total_lines, &node, false,
+                 merge_queue, tfp, temp_output);
+      pthread_join (thread, NULL);
+    }
+  else
+    {
+      /* Nthreads = 1, this is a leaf NODE, or pthread_create failed.
+         Sort with 1 thread. */
+      struct line *temp = lines - total_lines;
+      if (1 < nhi)
+        sequential_sort (lines - nlo, nhi, temp - nlo / 2, false);
       if (1 < nlo)
-        sortlines (lo, nlo, temp);
+        sequential_sort (lines, nlo, temp, false);
 
-      mergelines (temp, lo, nlo, sorted_hi, nhi);
+      /* Update merge NODE. No need to lock yet. */
+      node.lo = lines;
+      node.hi = lines - nlo;
+      node.end_lo = lines - nlo;
+      node.end_hi = lines - nlo - nhi;
+
+      queue_insert (merge_queue, &node);
+      merge_loop (merge_queue, total_lines, tfp, temp_output);
     }
 }
 
@@ -3262,7 +3681,8 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles,
 /* Sort NFILES FILES onto OUTPUT_FILE. */
 
 static void
-sort (char * const *files, size_t nfiles, char const *output_file)
+sort (char * const *files, size_t nfiles, char const *output_file,
+      unsigned long int nthreads)
 {
   struct buffer buf;
   size_t ntemps = 0;
@@ -3276,8 +3696,22 @@ sort (char * const *files, size_t nfiles, char const *output_file)
       char const *file = *files;
       FILE *fp = xfopen (file, "r");
       FILE *tfp;
-      size_t bytes_per_line = (2 * sizeof (struct line)
-                               - sizeof (struct line) / 2);
+
+      size_t bytes_per_line;
+      if (nthreads > 1)
+        {
+          /* Get log P. */
+          unsigned long int tmp = 1;
+          size_t mult = 1;
+          while (tmp < nthreads)
+            {
+              tmp *= 2;
+              mult++;
+            }
+          bytes_per_line = (mult * sizeof (struct line));
+        }
+      else
+        bytes_per_line = sizeof (struct line) * 3 / 2;
 
       if (! buf.alloc)
         initbuf (&buf, bytes_per_line,
@@ -3289,7 +3723,6 @@ sort (char * const *files, size_t nfiles, char const *output_file)
       while (fillbuf (&buf, fp, file))
         {
           struct line *line;
-          struct line *linebase;
 
           if (buf.eof && nfiles
               && (bytes_per_line + 1
@@ -3303,9 +3736,6 @@ sort (char * const *files, size_t nfiles, char const *output_file)
             }
 
           line = buffer_linelim (&buf);
-          linebase = line - buf.nlines;
-          if (1 < buf.nlines)
-            sortlines (line, buf.nlines, linebase);
           if (buf.eof && !nfiles && !ntemps && !buf.left)
             {
               xfclose (fp, file);
@@ -3318,16 +3748,23 @@ sort (char * const *files, size_t nfiles, char const *output_file)
               ++ntemps;
               temp_output = create_temp (&tfp, NULL);
             }
-
-          do
+          if (1 < buf.nlines)
             {
-              line--;
-              write_bytes (line, tfp, temp_output);
-              if (unique)
-                while (linebase < line && compare (line, line - 1, false) == 0)
-                  line--;
+              struct merge_node_queue merge_queue;
+              queue_init (&merge_queue, 2 * nthreads);
+
+              pthread_spinlock_t lock;
+              pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE);
+              struct merge_node node =
+                {NULL, NULL, NULL, NULL, NULL, buf.nlines,
+                 buf.nlines, MERGE_END, NULL, false, &lock};
+
+              sortlines (line, line, nthreads, buf.nlines, &node, true,
+                         &merge_queue, tfp, temp_output);
+              queue_destroy (&merge_queue);
             }
-          while (linebase < line);
+          else
+            write_unique (line - 1, tfp, temp_output);
 
           xfclose (tfp, temp_output);
 
@@ -3547,6 +3984,7 @@ main (int argc, char **argv)
   bool mergeonly = false;
   char *random_source = NULL;
   bool need_random = false;
+  unsigned long int nthreads = 0;
   size_t nfiles = 0;
   bool posixly_correct = (getenv ("POSIXLY_CORRECT") != NULL);
   bool obsolete_usage = (posix2_version () < 200112);
@@ -3882,6 +4320,10 @@ main (int argc, char **argv)
           add_temp_dir (optarg);
           break;
 
+        case PARALLEL_OPTION:
+          nthreads = specify_nthreads (oi, c, optarg);
+          break;
+
         case 'u':
           unique = true;
           break;
@@ -4030,6 +4472,9 @@ main (int argc, char **argv)
 
   if (need_random)
     {
+      /* Threading does not lock the randread_source structure, so
+         downgrade to one thread to avoid race conditions. */
+      nthreads = 1;
       randread_source = randread_new (random_source, MD5_DIGEST_SIZE);
       if (! randread_source)
         die (_("open failed"), random_source);
@@ -4084,7 +4529,15 @@ main (int argc, char **argv)
       IF_LINT (free (sortfiles));
     }
   else
-    sort (files, nfiles, outfile);
+    {
+      /* If NTHREADS > number of cores on the machine, spinlocking
+         could be wasteful.  */
+      unsigned long int np2 = num_processors (NPROC_CURRENT_OVERRIDABLE);
+      if (!nthreads || nthreads > np2)
+        nthreads = np2;
+
+      sort (files, nfiles, outfile, nthreads);
+    }
 
   if (have_read_stdin && fclose (stdin) == EOF)
     die (_("close failed"), "-");
index a993e82..fccd000 100644 (file)
@@ -223,6 +223,7 @@ TESTS =                                             \
   misc/shred-remove                            \
   misc/shuf                                    \
   misc/sort                                    \
+  misc/sort-benchmark-random                   \
   misc/sort-compress                           \
   misc/sort-continue                           \
   misc/sort-debug-keys                         \
diff --git a/tests/misc/sort-benchmark-random b/tests/misc/sort-benchmark-random
new file mode 100755 (executable)
index 0000000..3325381
--- /dev/null
@@ -0,0 +1,52 @@
+#!/bin/sh
+# Benchmark sort on randomly generated data.
+
+# Copyright (C) 2010 Free Software Foundation, Inc.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+# Written by Glen Lenker.
+
+. "${srcdir=.}/init.sh"; path_prepend_ ../src
+
+very_expensive_
+
+perl -e '
+my $num_lines = 500000;
+my $length = 100;
+
+for (my $i=0; $i < $num_lines; $i++)
+{
+    for (my $j=0; $j < $length; $j++)
+    {
+      printf "%c", 32 + rand(94);
+    }
+    print "\n";
+}' > in || framework_failure
+
+# We need to generate a lot of data for sort to show a noticeable
+# improvement in performance. Sorting it in PERL may take awhile.
+
+perl -e '
+open (FILE, "<in");
+my @list = <FILE>;
+print sort(@list);
+close (FILE);
+' > exp || framework_failure
+
+time sort in > out || fail=1
+
+compare out exp || fail=1
+
+Exit $fail