New function to sort tasks pushed into a threadpool. (#324479, Martyn
authorMatthias Clasen <mclasen@redhat.com>
Wed, 21 Dec 2005 04:45:56 +0000 (04:45 +0000)
committerMatthias Clasen <matthiasc@src.gnome.org>
Wed, 21 Dec 2005 04:45:56 +0000 (04:45 +0000)
2005-12-20  Matthias Clasen  <mclasen@redhat.com>

* glib/glib.symbols:
* glib/gthreadpool.h:
* glib/gthreadpool.c (g_thread_pool_set_sort_function): New function
to sort tasks pushed into a threadpool.  (#324479, Martyn Russell)

* tests/threadpool-test.c: Test this.

ChangeLog
ChangeLog.pre-2-10
ChangeLog.pre-2-12
docs/reference/ChangeLog
docs/reference/glib/glib-sections.txt
glib/glib.symbols
glib/gthreadpool.c
glib/gthreadpool.h
tests/threadpool-test.c

index a17931884b426fd41b3456b6445ed42a9f07e225..cb65ab53899056cca4e3fb8bcaf11c10b784255e 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,12 @@
+2005-12-20  Matthias Clasen  <mclasen@redhat.com>
+
+       * glib/glib.symbols: 
+       * glib/gthreadpool.h: 
+       * glib/gthreadpool.c (g_thread_pool_set_sort_function): New function
+       to sort tasks pushed into a threadpool.  (#324479, Martyn Russell)
+
+       * tests/threadpool-test.c: Test this.
+
 Tue Dec 20 18:14:14 2005  Tim Janik  <timj@imendio.com>
 
        * glib/gslice.[hc]: added mem_error() and mem_assert() to test and 
index a17931884b426fd41b3456b6445ed42a9f07e225..cb65ab53899056cca4e3fb8bcaf11c10b784255e 100644 (file)
@@ -1,3 +1,12 @@
+2005-12-20  Matthias Clasen  <mclasen@redhat.com>
+
+       * glib/glib.symbols: 
+       * glib/gthreadpool.h: 
+       * glib/gthreadpool.c (g_thread_pool_set_sort_function): New function
+       to sort tasks pushed into a threadpool.  (#324479, Martyn Russell)
+
+       * tests/threadpool-test.c: Test this.
+
 Tue Dec 20 18:14:14 2005  Tim Janik  <timj@imendio.com>
 
        * glib/gslice.[hc]: added mem_error() and mem_assert() to test and 
index a17931884b426fd41b3456b6445ed42a9f07e225..cb65ab53899056cca4e3fb8bcaf11c10b784255e 100644 (file)
@@ -1,3 +1,12 @@
+2005-12-20  Matthias Clasen  <mclasen@redhat.com>
+
+       * glib/glib.symbols: 
+       * glib/gthreadpool.h: 
+       * glib/gthreadpool.c (g_thread_pool_set_sort_function): New function
+       to sort tasks pushed into a threadpool.  (#324479, Martyn Russell)
+
+       * tests/threadpool-test.c: Test this.
+
 Tue Dec 20 18:14:14 2005  Tim Janik  <timj@imendio.com>
 
        * glib/gslice.[hc]: added mem_error() and mem_assert() to test and 
index 48085407c1dd759371635d1b8d5b81aaaf4cbaa3..8c555fa9811aa7a28eb7f53176fd6fd69e41a43a 100644 (file)
@@ -1,3 +1,7 @@
+2005-12-20  Matthias Clasen  <mclasen@redhat.com>
+
+       * glib/glib-sections.txt: add g_thread_pool_set_sort_function.
+       
 2005-12-19  Matthias Clasen  <mclasen@redhat.com>
 
        * glib/tmpl/*.sgml: Update versioned deprecation
index 05e15dcc24a52d630102ef9fda351b89839fac99..b2c27850f2f368d976d55abd3845c89cacae56b9 100644 (file)
@@ -637,6 +637,7 @@ g_thread_pool_set_max_unused_threads
 g_thread_pool_get_max_unused_threads
 g_thread_pool_get_num_unused_threads
 g_thread_pool_stop_unused_threads
+g_thread_pool_set_sort_function
 </SECTION>
 
 <SECTION>
index 42aaa2a1fec43d0c8b8ee802b9921ded75af29bb..fb651292259c4c52d2ca844e6875308cdc5861cf 100644 (file)
@@ -1094,6 +1094,7 @@ g_thread_pool_set_max_threads
 g_thread_pool_set_max_unused_threads
 g_thread_pool_stop_unused_threads
 g_thread_pool_unprocessed
+g_thread_pool_set_sort_function
 #endif
 #endif
 
index d39974e13afff682208618a0720b9d475113fdc9..5aa05f7a8501284f79c6c96994a1a4b485a6a31c 100644 (file)
@@ -41,6 +41,8 @@ struct _GRealThreadPool
   gboolean running;
   gboolean immediate;
   gboolean waiting;
+  GCompareDataFunc sort_func;
+  gpointer sort_user_data;
 };
 
 /* The following is just an address to mark the stop order for a
@@ -57,15 +59,33 @@ G_LOCK_DEFINE_STATIC (unused_threads);
 static GMutex *inform_mutex = NULL;
 static GCond *inform_cond = NULL;
 
-static void     g_thread_pool_free_internal (GRealThreadPool* pool);
-static gpointer g_thread_pool_thread_proxy (gpointer data);
-static void     g_thread_pool_start_thread (GRealThreadPool* pool, 
-                                           GError **error);
-static void     g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool);
+
+static void     g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
+                                                  gpointer          data);
+static void     g_thread_pool_free_internal       (GRealThreadPool  *pool);
+static gpointer g_thread_pool_thread_proxy        (gpointer          data);
+static void     g_thread_pool_start_thread        (GRealThreadPool  *pool,
+                                                  GError          **error);
+static void     g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
+
 
 #define g_thread_should_run(pool, len) \
   ((pool)->running || (!(pool)->immediate && (len) > 0))
 
+
+static void
+g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
+                                  gpointer         data)
+{
+  if (pool->sort_func) 
+    g_async_queue_push_sorted_unlocked (pool->queue, 
+                                       data,
+                                       pool->sort_func, 
+                                       pool->sort_user_data);
+  else
+    g_async_queue_push_unlocked (pool->queue, data);
+}
+
 static gpointer 
 g_thread_pool_thread_proxy (gpointer data)
 {
@@ -105,10 +125,10 @@ g_thread_pool_thread_proxy (gpointer data)
                 * the global pool and just hand the data further to
                 * the next one waiting in the queue */
                {
-                 g_async_queue_push_unlocked (pool->queue, task);
+                 g_thread_pool_queue_push_unlocked (pool, task);
                  goto_global_pool = TRUE;
                }
-             else if (pool->running || !pool->immediate)
+             else if (pool->running || !pool->immediate)
                {
                  g_async_queue_unlock (pool->queue);
                  pool->pool.func (task, pool->pool.user_data);
@@ -293,6 +313,8 @@ g_thread_pool_new (GFunc            func,
   retval->max_threads = max_threads;
   retval->num_threads = 0;
   retval->running = TRUE;
+  retval->sort_func = NULL;
+  retval->sort_user_data = NULL;
 
   G_LOCK (init);
   
@@ -365,7 +387,7 @@ g_thread_pool_push (GThreadPool     *pool,
     /* No thread is waiting in the queue */
     g_thread_pool_start_thread (real, error);
 
-  g_async_queue_push_unlocked (real->queue, data);
+  g_thread_pool_queue_push_unlocked (real, data);
   g_async_queue_unlock (real->queue);
 }
 
@@ -599,7 +621,7 @@ g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
 
   pool->immediate = TRUE; 
   for (i = 0; i < pool->num_threads; i++)
-    g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
+    g_thread_pool_queue_push_unlocked (pool, GUINT_TO_POINTER (1));
 }
 
 /**
@@ -684,5 +706,44 @@ void g_thread_pool_stop_unused_threads (void)
   g_thread_pool_set_max_unused_threads (oldval);
 }
 
+/**
+ * g_thread_pool_set_sort_function:
+ * @pool: a #GThreadPool
+ * @func: the #GCompareDataFunc used to sort the list of tasks. 
+ *     This function is passed two tasks. It should return
+ *     0 if the order in which they are handled does not matter, 
+ *     a negative value if the first task should be processed before
+ *     the second or a positive value if the second task should be 
+ *     processed first.
+ * @user_data: user data passed to @func.
+ *
+ * Sets the function used to sort the list of tasks. This allows the
+ * tasks to be processed by a priority determined by @func, and not
+ * just in the order in which they were added to the pool.
+ *
+ * Since: 2.10
+ **/
+void g_thread_pool_set_sort_function (GThreadPool      *pool,
+                                     GCompareDataFunc  func,
+                                     gpointer          user_data)
+{ 
+  GRealThreadPool *real = (GRealThreadPool*) pool;
+
+  g_return_if_fail (real);
+  g_return_if_fail (real->running);
+
+  g_async_queue_lock (real->queue);
+
+  real->sort_func = func;
+  real->sort_user_data = user_data;
+  
+  if (func) 
+    g_async_queue_sort_unlocked (real->queue, 
+                                real->sort_func,
+                                real->sort_user_data);
+
+  g_async_queue_unlock (real->queue);
+}
+
 #define __G_THREADPOOL_C__
 #include "galiasdef.c"
index ed1e81ba350c34709bd493c388e5b500615fc676..e0bb146426675e713ce0ccd8511f57351cd213e7 100644 (file)
@@ -96,6 +96,11 @@ guint           g_thread_pool_get_num_unused_threads (void);
 /* Stop all currently unused threads, but leave the limit untouched */
 void            g_thread_pool_stop_unused_threads    (void);
 
+/* Set sort function for priority threading */
+void            g_thread_pool_set_sort_function      (GThreadPool      *pool,
+                                                     GCompareDataFunc  func,
+                                                     gpointer          user_data);
+
 G_END_DECLS
 
 #endif /* __G_THREADPOOL_H__ */
index 95993e8f52796c4e5b7c5edc01ccfd16220e717c..0168561fa2e74e1700eb9438d09fce8ff03d689a 100644 (file)
@@ -1,51 +1,76 @@
 #undef G_DISABLE_ASSERT
 #undef G_LOG_DOMAIN
 
+#include <config.h>
+
 #include <glib.h>
 
+#define d(x) x
+
 #define RUNS 100
 
-G_LOCK_DEFINE_STATIC (thread_counter);
-gulong abs_thread_counter;
-gulong running_thread_counter;
-gulong leftover_task_counter;
+#define WAIT                5    /* seconds */
+#define MAX_THREADS         10
+#define MAX_UNUSED_THREADS  2
+
+G_LOCK_DEFINE_STATIC (thread_counter_pools);
+
+static gulong abs_thread_counter = 0;
+static gulong running_thread_counter = 0;
+static gulong leftover_task_counter = 0;
+
+G_LOCK_DEFINE_STATIC (last_thread);
+
+static guint last_thread_id = 0;
+
+G_LOCK_DEFINE_STATIC (thread_counter_sort);
+
+static gulong sort_thread_counter = 0;
+
 
-void
-thread_pool_func (gpointer a, gpointer b)
+static GMainLoop *main_loop = NULL;
+
+
+static void
+test_thread_pools_entry_func (gpointer data, gpointer user_data)
 {
-  G_LOCK (thread_counter);
+  guint id = 0;
+
+  id = GPOINTER_TO_UINT (data);
+
+  d(g_print ("[pool] ---> [%3.3d] entered thread\n", id));
+
+  G_LOCK (thread_counter_pools);
   abs_thread_counter++;
   running_thread_counter++;
-  G_UNLOCK (thread_counter);
+  G_UNLOCK (thread_counter_pools);
 
   g_usleep (g_random_int_range (0, 4000));
 
-  G_LOCK (thread_counter);
+  G_LOCK (thread_counter_pools);
   running_thread_counter--;
   leftover_task_counter--;
-  G_UNLOCK (thread_counter);
+
+  g_print ("[pool] ---> [%3.3d] exiting thread (abs count:%ld, running count:%ld, left over:%ld)\n", 
+          id, abs_thread_counter, running_thread_counter, leftover_task_counter); 
+  G_UNLOCK (thread_counter_pools);
 }
 
-int 
-main (int   argc,
-      char *argv[])
+static void
+test_thread_pools (void)
 {
-  /* Only run the test, if threads are enabled and a default thread
-     implementation is available */
-#if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE)
   GThreadPool *pool1, *pool2, *pool3;
   guint i;
-  g_thread_init (NULL);
   
-  pool1 = g_thread_pool_new (thread_pool_func, NULL, 3, FALSE, NULL);
-  pool2 = g_thread_pool_new (thread_pool_func, NULL, 5, TRUE, NULL);
-  pool3 = g_thread_pool_new (thread_pool_func, NULL, 7, TRUE, NULL);
+  pool1 = g_thread_pool_new ((GFunc)test_thread_pools_entry_func, NULL, 3, FALSE, NULL);
+  pool2 = g_thread_pool_new ((GFunc)test_thread_pools_entry_func, NULL, 5, TRUE, NULL);
+  pool3 = g_thread_pool_new ((GFunc)test_thread_pools_entry_func, NULL, 7, TRUE, NULL);
 
   for (i = 0; i < RUNS; i++)
     {
-      g_thread_pool_push (pool1, GUINT_TO_POINTER (1), NULL);
-      g_thread_pool_push (pool2, GUINT_TO_POINTER (1), NULL);
-      g_thread_pool_push (pool3, GUINT_TO_POINTER (1), NULL);
+      g_thread_pool_push (pool1, GUINT_TO_POINTER (i), NULL);
+      g_thread_pool_push (pool2, GUINT_TO_POINTER (i), NULL);
+      g_thread_pool_push (pool3, GUINT_TO_POINTER (i), NULL);
       leftover_task_counter += 3;
     } 
   
@@ -55,6 +80,164 @@ main (int   argc,
 
   g_assert (RUNS * 3 == abs_thread_counter + leftover_task_counter);
   g_assert (running_thread_counter == 0);  
+}
+
+static gint
+test_thread_sort_compare_func (gconstpointer a, gconstpointer b, gpointer user_data)
+{
+  guint32 id1, id2;
+
+  id1 = GPOINTER_TO_UINT (a);
+  id2 = GPOINTER_TO_UINT (b);
+
+  return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1); 
+}
+
+static void
+test_thread_sort_entry_func (gpointer data, gpointer user_data)
+{
+  guint thread_id;
+  gboolean is_sorted;
+
+  G_LOCK (last_thread);
+
+  thread_id = GPOINTER_TO_UINT (data);
+  is_sorted = GPOINTER_TO_INT (user_data);
+
+  d(g_print ("%s ---> entered thread:%2.2d, last thread:%2.2d\n", 
+            is_sorted ? "[  sorted]" : "[unsorted]", thread_id, last_thread_id));
+
+  if (is_sorted) {
+    static gboolean last_failed = FALSE;
+
+    if (last_thread_id > thread_id) {
+      if (last_failed) {
+       g_assert (last_thread_id <= thread_id);  
+      }
+
+      /* here we remember one fail and if it concurrently fails, it
+        can not be sorted. the last thread id might be < this thread
+        id if something is added to the queue since threads were
+        created */
+      last_failed = TRUE;
+    } else {
+      last_failed = FALSE;
+    }
+
+    last_thread_id = thread_id;
+  }
+
+  G_UNLOCK (last_thread);
+
+  g_usleep (WAIT * 1000);
+}
+
+static void
+test_thread_sort (gboolean sort)
+{
+  GThreadPool *pool;
+  guint limit = 20;
+  gint i;
+
+  pool = g_thread_pool_new (test_thread_sort_entry_func, 
+                           GINT_TO_POINTER (sort), 
+                           MAX_THREADS,
+                           FALSE,
+                           NULL);
+
+  g_thread_pool_set_max_unused_threads (MAX_UNUSED_THREADS); 
+
+  if (sort) {
+    g_thread_pool_set_sort_function (pool, 
+                                    test_thread_sort_compare_func,
+                                    GUINT_TO_POINTER (69));
+  }
+  
+  for (i = 0; i < limit; i++) {
+    guint id;
+
+    id = g_random_int_range (1, limit*2);
+    g_thread_pool_push (pool, GUINT_TO_POINTER (id), NULL);
+  }
+
+  g_assert (g_thread_pool_get_num_threads (pool) == g_thread_pool_get_max_threads (pool));
+}
+
+static gboolean
+test_check_start_and_stop (gpointer user_data)
+{
+  static guint test_number = 0;
+  static gboolean run_next = FALSE;
+  gboolean continue_timeout = TRUE;
+  gboolean quit = TRUE;
+
+  if (test_number == 0) {
+    run_next = TRUE;
+    d(g_print ("***** RUNNING TEST %2.2d *****\n", test_number)); 
+  }
+   
+  if (run_next) {
+    test_number++;
+
+    switch (test_number) {
+    case 1:
+      test_thread_pools ();   
+      break;
+    case 2:
+      test_thread_sort (FALSE);  
+      break;
+    case 3:
+      test_thread_sort (TRUE);  
+      break;
+    default:
+      d(g_print ("***** END OF TESTS *****\n")); 
+      g_main_loop_quit (main_loop);
+      continue_timeout = FALSE;
+      break;
+    }
+
+    run_next = FALSE;
+    return TRUE;
+  }
+
+  if (test_number == 1) {
+    G_LOCK (thread_counter_pools); 
+    quit &= running_thread_counter <= 0;
+    d(g_print ("***** POOL RUNNING THREAD COUNT:%ld\n", 
+              running_thread_counter)); 
+    G_UNLOCK (thread_counter_pools); 
+  }
+
+  if (test_number == 2 || test_number == 3) {
+    G_LOCK (thread_counter_sort);
+    quit &= sort_thread_counter <= 0;
+    d(g_print ("***** POOL SORT THREAD COUNT:%ld\n", 
+              sort_thread_counter)); 
+    G_UNLOCK (thread_counter_sort); 
+  }
+
+  if (quit) {
+    run_next = TRUE;
+  }
+
+  return continue_timeout;
+}
+
+int 
+main (int argc, char *argv[])
+{
+  /* Only run the test, if threads are enabled and a default thread
+     implementation is available */
+
+#if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE)
+  g_thread_init (NULL);
+
+  d(g_print ("Starting... (in one second)\n"));
+  g_timeout_add (1000, test_check_start_and_stop, NULL); 
+  
+  main_loop = g_main_loop_new (NULL, FALSE);
+  g_main_loop_run (main_loop);
 #endif
+
   return 0;
 }