- Added new API g_thread_pool_get_idle_time() and
authorMartyn James Russell <mr@src.gnome.org>
Tue, 3 Jan 2006 15:09:52 +0000 (15:09 +0000)
committerMartyn James Russell <mr@src.gnome.org>
Tue, 3 Jan 2006 15:09:52 +0000 (15:09 +0000)
* docs/reference/glib/glib-sections.txt:
* glib/glib.symbols:
* glib/gthreadpool.[ch]:
- Added new API g_thread_pool_get_idle_time() and
g_thread_pool_set_idle_time(). (#324228).

* tests/threadpool-test.c:
- Updated test case to do thread pool sorting, thread pool with
no sorting and a thread pool with idle thread timeouts.

ChangeLog
ChangeLog.pre-2-10
ChangeLog.pre-2-12
docs/reference/glib/glib-sections.txt
glib/glib.symbols
glib/gthreadpool.c
glib/gthreadpool.h
tests/threadpool-test.c

index 5bbda20..dfa0a53 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,15 @@
+2006-01-03  Martyn Russell  <martyn@imendio.com>
+
+       * docs/reference/glib/glib-sections.txt:
+       * glib/glib.symbols:
+       * glib/gthreadpool.[ch]:
+       - Added new API g_thread_pool_get_idle_time() and
+       g_thread_pool_set_idle_time(). (#324228).
+       
+       * tests/threadpool-test.c: 
+       - Updated test case to do thread pool sorting, thread pool with
+       no sorting and a thread pool with idle thread timeouts.
+       
 2006-01-03  Matthias Clasen  <mclasen@redhat.com>
        
        * glib/gmain.h: Add new functions here, too.
index 5bbda20..dfa0a53 100644 (file)
@@ -1,3 +1,15 @@
+2006-01-03  Martyn Russell  <martyn@imendio.com>
+
+       * docs/reference/glib/glib-sections.txt:
+       * glib/glib.symbols:
+       * glib/gthreadpool.[ch]:
+       - Added new API g_thread_pool_get_idle_time() and
+       g_thread_pool_set_idle_time(). (#324228).
+       
+       * tests/threadpool-test.c: 
+       - Updated test case to do thread pool sorting, thread pool with
+       no sorting and a thread pool with idle thread timeouts.
+       
 2006-01-03  Matthias Clasen  <mclasen@redhat.com>
        
        * glib/gmain.h: Add new functions here, too.
index 5bbda20..dfa0a53 100644 (file)
@@ -1,3 +1,15 @@
+2006-01-03  Martyn Russell  <martyn@imendio.com>
+
+       * docs/reference/glib/glib-sections.txt:
+       * glib/glib.symbols:
+       * glib/gthreadpool.[ch]:
+       - Added new API g_thread_pool_get_idle_time() and
+       g_thread_pool_set_idle_time(). (#324228).
+       
+       * tests/threadpool-test.c: 
+       - Updated test case to do thread pool sorting, thread pool with
+       no sorting and a thread pool with idle thread timeouts.
+       
 2006-01-03  Matthias Clasen  <mclasen@redhat.com>
        
        * glib/gmain.h: Add new functions here, too.
index dbffc68..46aa8d7 100644 (file)
@@ -639,6 +639,8 @@ g_thread_pool_get_max_unused_threads
 g_thread_pool_get_num_unused_threads
 g_thread_pool_stop_unused_threads
 g_thread_pool_set_sort_function
+g_thread_pool_set_max_idle_time
+g_thread_pool_get_max_idle_time
 </SECTION>
 
 <SECTION>
index eaa1803..a778c67 100644 (file)
@@ -1087,12 +1087,14 @@ g_thread_foreach
 g_thread_pool_free
 g_thread_pool_get_max_threads
 g_thread_pool_get_max_unused_threads
+g_thread_pool_get_max_idle_time
 g_thread_pool_get_num_threads
 g_thread_pool_get_num_unused_threads
 g_thread_pool_new
 g_thread_pool_push
 g_thread_pool_set_max_threads
 g_thread_pool_set_max_unused_threads
+g_thread_pool_set_max_idle_time
 g_thread_pool_stop_unused_threads
 g_thread_pool_unprocessed
 g_thread_pool_set_sort_function
index 5aa05f7..9cf7408 100644 (file)
@@ -29,6 +29,7 @@
 #include "glib.h"
 #include "galias.h"
 
+#define debug(...) /* g_printerr (__VA_ARGS__) */
 
 typedef struct _GRealThreadPool GRealThreadPool;
 
@@ -54,7 +55,8 @@ static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new;
 static GAsyncQueue *unused_thread_queue;
 static gint unused_threads = 0;
 static gint max_unused_threads = 0;
-G_LOCK_DEFINE_STATIC (unused_threads);
+static guint max_idle_time = 0;
+G_LOCK_DEFINE_STATIC (settings);
 
 static GMutex *inform_mutex = NULL;
 static GCond *inform_cond = NULL;
@@ -92,13 +94,15 @@ g_thread_pool_thread_proxy (gpointer data)
   GRealThreadPool *pool = data;
   gboolean watcher = FALSE;
 
+  debug("pool:0x%.8x entering proxy ...\n", (guint)pool);
+
   g_async_queue_lock (pool->queue);
   while (TRUE)
     {
-      gpointer task; 
+      gpointer task = NULL
       gboolean goto_global_pool = !pool->pool.exclusive;
       gint len = g_async_queue_length_unlocked (pool->queue);
-      
+
       if (g_thread_should_run (pool, len))
        {
          if (watcher)
@@ -111,11 +115,38 @@ g_thread_pool_thread_proxy (gpointer data)
              GTimeVal end_time;
              g_get_current_time (&end_time);
              g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */
+             debug("pool:0x%.8x waiting 1/2 second to pop next item "
+                   "in queue (%d running, %d unprocessed) ...\n", 
+                   (guint)pool, 
+                   pool->num_threads, 
+                   g_async_queue_length_unlocked (pool->queue));
              task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
            }
+         else if (g_thread_pool_get_max_idle_time() > 0) 
+           {
+             /* We always give a maximum time to pop the next task so
+              * we know that when we evaluate task further down, that
+              * it has had the maximum time to get a new task and it
+              * can die */
+             GTimeVal end_time;
+             g_get_current_time (&end_time);
+             debug("pool:0x%.8x waiting %d ms max to pop next item in "
+                   "queue (%d running, %d unprocessed) or exiting ...\n", 
+                   (guint)pool, 
+                   g_thread_pool_get_max_idle_time (), 
+                   pool->num_threads, 
+                   g_async_queue_length_unlocked (pool->queue));
+             
+             g_time_val_add (&end_time, g_thread_pool_get_max_idle_time () * 1000);
+             task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
+           }
          else
-           task = g_async_queue_pop_unlocked (pool->queue);
-         
+           {
+             task = g_async_queue_pop_unlocked (pool->queue);
+             debug("pool:0x%.8x new task:0x%.8x poped from pool queue ...\n", 
+                   (guint)pool, (guint)task);
+       }
+
          if (task)
            {
              watcher = FALSE;
@@ -125,19 +156,49 @@ g_thread_pool_thread_proxy (gpointer data)
                 * the global pool and just hand the data further to
                 * the next one waiting in the queue */
                {
+                 debug("pool:0x%.8x, task:0x%.8x we have too many threads "
+                       "and max is set, pushing task into queue ...\n", 
+                       (guint)pool, (guint)task);
                  g_thread_pool_queue_push_unlocked (pool, task);
                  goto_global_pool = TRUE;
                }
              else if (pool->running || !pool->immediate)
                {
                  g_async_queue_unlock (pool->queue);
+                 debug("pool:0x%.8x, task:0x%.8x calling func ...\n", 
+                       (guint)pool, (guint)task);
                  pool->pool.func (task, pool->pool.user_data);
                  g_async_queue_lock (pool->queue);
                }
+           } 
+         else if (g_thread_pool_get_max_idle_time() > 0)  
+           {
+             G_LOCK (settings);
+             if (pool->num_threads > max_unused_threads) {
+               G_UNLOCK (settings);
+               pool->num_threads--;      
+               
+               debug("pool:0x%.8x queue timed pop has no tasks waiting, "
+                     "so stopping thread (%d running, %d unprocessed) ...\n", 
+                     (guint)pool, 
+                     pool->num_threads, 
+                     g_async_queue_length_unlocked (pool->queue));
+               g_async_queue_unlock (pool->queue);
+         
+               return NULL; 
+             }
+             G_UNLOCK (settings);
            }
          len = g_async_queue_length_unlocked (pool->queue);
        }
 
+      debug("pool:0x%.8x, len:%d, watcher:%s, exclusive:%s, should run:%s\n",
+           (guint)pool, 
+           len, 
+           watcher ? "true" : "false", 
+           pool->pool.exclusive ? "true" : "false", 
+           g_thread_should_run (pool, len) ? "true" : "false");
+
       if (!g_thread_should_run (pool, len))
        {
          g_cond_broadcast (inform_cond);
@@ -148,6 +209,11 @@ g_thread_pool_thread_proxy (gpointer data)
          /* At this pool there are no threads waiting, but tasks are. */
          goto_global_pool = FALSE; 
        }
+      else if (len < 1 && g_thread_pool_get_max_idle_time () > 0) 
+       {
+         goto_global_pool = FALSE;
+         watcher = FALSE;
+       }
       else if (len == 0 && !watcher && !pool->pool.exclusive)
        {
          /* Here neither threads nor tasks are queued and we didn't
@@ -156,10 +222,11 @@ g_thread_pool_thread_proxy (gpointer data)
           * switches. */
          goto_global_pool = FALSE;
          watcher = TRUE;
-       }
+       } 
 
       if (goto_global_pool)
        {
+         debug("pool:0x%.8x, now in the global pool\n", (guint)pool);
          pool->num_threads--;
 
          if (!pool->running && !pool->waiting)
@@ -182,23 +249,27 @@ g_thread_pool_thread_proxy (gpointer data)
          
          g_async_queue_lock (unused_thread_queue);
 
-         G_LOCK (unused_threads);
-         if ((unused_threads >= max_unused_threads && 
+         G_LOCK (settings);
+         if ((unused_threads >= max_unused_threads &&
               max_unused_threads != -1))
            {
-             G_UNLOCK (unused_threads);
+             G_UNLOCK (settings);
              g_async_queue_unlock (unused_thread_queue);
+             debug("pool:0x%.8x stopping thread (%d running, %d unprocessed) ...\n", 
+                   (guint)pool, 
+                   pool->num_threads, 
+                   g_async_queue_length_unlocked (pool->queue));
              /* Stop this thread */
              return NULL;      
            }
          unused_threads++;
-         G_UNLOCK (unused_threads);
+         G_UNLOCK (settings);
 
          pool = g_async_queue_pop_unlocked (unused_thread_queue);
 
-         G_LOCK (unused_threads);
+         G_LOCK (settings);
          unused_threads--;
-         G_UNLOCK (unused_threads);
+         G_UNLOCK (settings);
 
          g_async_queue_unlock (unused_thread_queue);
          
@@ -252,6 +323,8 @@ g_thread_pool_start_thread (GRealThreadPool  *pool,
   /* See comment in g_thread_pool_thread_proxy as to why this is done
    * here and not there */
   pool->num_threads++;
+  debug("pool:0x%.8x thread created, (running:%d)\n", 
+       (guint)pool, pool->num_threads);
 }
 
 /**
@@ -637,7 +710,7 @@ g_thread_pool_set_max_unused_threads (gint max_threads)
 {
   g_return_if_fail (max_threads >= -1);  
 
-  G_LOCK (unused_threads);
+  G_LOCK (settings);
   
   max_unused_threads = max_threads;
 
@@ -652,7 +725,7 @@ g_thread_pool_set_max_unused_threads (gint max_threads)
       g_async_queue_unlock (unused_thread_queue);
     }
     
-  G_UNLOCK (unused_threads);
+  G_UNLOCK (settings);
 }
 
 /**
@@ -667,9 +740,9 @@ g_thread_pool_get_max_unused_threads (void)
 {
   gint retval;
   
-  G_LOCK (unused_threads);
+  G_LOCK (settings);
   retval = max_unused_threads;
-  G_UNLOCK (unused_threads);
+  G_UNLOCK (settings);
 
   return retval;
 }
@@ -681,13 +754,14 @@ g_thread_pool_get_max_unused_threads (void)
  *
  * Return value: the number of currently unused threads
  **/
-guint g_thread_pool_get_num_unused_threads (void)
+guint 
+g_thread_pool_get_num_unused_threads (void)
 {
   guint retval;
   
-  G_LOCK (unused_threads);
+  G_LOCK (settings);
   retval = unused_threads;
-  G_UNLOCK (unused_threads);
+  G_UNLOCK (settings);
 
   return retval;
 }
@@ -699,7 +773,8 @@ guint g_thread_pool_get_num_unused_threads (void)
  * maximal number of unused threads. This function can be used to
  * regularly stop all unused threads e.g. from g_timeout_add().
  **/
-void g_thread_pool_stop_unused_threads (void)
+void
+g_thread_pool_stop_unused_threads (void)
 { 
   guint oldval = g_thread_pool_get_max_unused_threads ();
   g_thread_pool_set_max_unused_threads (0);
@@ -723,9 +798,10 @@ void g_thread_pool_stop_unused_threads (void)
  *
  * Since: 2.10
  **/
-void g_thread_pool_set_sort_function (GThreadPool      *pool,
-                                     GCompareDataFunc  func,
-                                     gpointer          user_data)
+void 
+g_thread_pool_set_sort_function (GThreadPool      *pool,
+                                GCompareDataFunc  func,
+                                gpointer          user_data)
 { 
   GRealThreadPool *real = (GRealThreadPool*) pool;
 
@@ -745,5 +821,57 @@ void g_thread_pool_set_sort_function (GThreadPool      *pool,
   g_async_queue_unlock (real->queue);
 }
 
+/**
+ * g_thread_pool_set_max_idle_time:
+ * @interval: the maximum @interval (1/1000ths of a second) a thread
+ *     can be idle. 
+ *
+ * This function will set the maximum @interval that a thread waiting
+ * in the pool for new tasks can be idle for before being
+ * stopped. This function is similar to calling
+ * g_thread_pool_stop_unused_threads() on a regular timeout, except,
+ * this is done on a per thread basis.    
+ *
+ * By setting @interval to 0, idle threads will not be stopped.
+ *  
+ * This function makes use of g_async_queue_timed_pop () using
+ * @interval.
+ *
+ * Since: 2.10
+ **/
+void
+g_thread_pool_set_max_idle_time (guint interval)
+{ 
+  G_LOCK (settings);
+  max_idle_time = interval;
+  G_UNLOCK (settings);
+}
+
+/**
+ * g_thread_pool_get_max_idle_time:
+ * 
+ * This function will return the maximum @interval that a thread will
+ * wait in the thread pool for new tasks before being stopped.
+ *
+ * If this function returns 0, threads waiting in the thread pool for
+ * new work are not stopped.
+ *
+ * Return value: the maximum @interval to wait for new tasks in the
+ *     thread pool before stopping the thread (1/1000ths of a second).
+ *  
+ * Since: 2.10
+ **/
+guint
+g_thread_pool_get_max_idle_time (void)
+{ 
+  guint retval;
+
+  G_LOCK (settings);
+  retval = max_idle_time;
+  G_UNLOCK (settings);
+
+  return retval;
+}
+
 #define __G_THREADPOOL_C__
 #include "galiasdef.c"
index e0bb146..371b9ef 100644 (file)
@@ -101,6 +101,10 @@ void            g_thread_pool_set_sort_function      (GThreadPool      *pool,
                                                      GCompareDataFunc  func,
                                                      gpointer          user_data);
 
+/* Set maximum time a thread can be idle in the pool before it is stopped */
+void            g_thread_pool_set_max_idle_time      (guint             interval);
+guint           g_thread_pool_get_max_idle_time      (void);
+
 G_END_DECLS
 
 #endif /* __G_THREADPOOL_H__ */
index 0168561..d5749a8 100644 (file)
@@ -5,13 +5,17 @@
 
 #include <glib.h>
 
-#define d(x) x
+#define debug(...) g_printerr (__VA_ARGS__)
 
 #define RUNS 100
 
 #define WAIT                5    /* seconds */
 #define MAX_THREADS         10
-#define MAX_UNUSED_THREADS  2
+
+/* if > 0 the test will run continously (since the test ends when
+ * thread count is 0), if -1 it means no limit to unused threads  
+ * if 0 then no unused threads are possible */
+#define MAX_UNUSED_THREADS -1    
 
 G_LOCK_DEFINE_STATIC (thread_counter_pools);
 
@@ -27,6 +31,7 @@ G_LOCK_DEFINE_STATIC (thread_counter_sort);
 
 static gulong sort_thread_counter = 0;
 
+static GThreadPool *idle_pool = NULL;
 
 static GMainLoop *main_loop = NULL;
 
@@ -38,7 +43,7 @@ test_thread_pools_entry_func (gpointer data, gpointer user_data)
 
   id = GPOINTER_TO_UINT (data);
 
-  d(g_print ("[pool] ---> [%3.3d] entered thread\n", id));
+  debug("[pool] ---> [%3.3d] entered thread\n", id);
 
   G_LOCK (thread_counter_pools);
   abs_thread_counter++;
@@ -104,8 +109,8 @@ test_thread_sort_entry_func (gpointer data, gpointer user_data)
   thread_id = GPOINTER_TO_UINT (data);
   is_sorted = GPOINTER_TO_INT (user_data);
 
-  d(g_print ("%s ---> entered thread:%2.2d, last thread:%2.2d\n", 
-            is_sorted ? "[  sorted]" : "[unsorted]", thread_id, last_thread_id));
+  debug("%s ---> entered thread:%2.2d, last thread:%2.2d\n", 
+       is_sorted ? "[  sorted]" : "[unsorted]", thread_id, last_thread_id);
 
   if (is_sorted) {
     static gboolean last_failed = FALSE;
@@ -163,6 +168,74 @@ test_thread_sort (gboolean sort)
   g_assert (g_thread_pool_get_num_threads (pool) == g_thread_pool_get_max_threads (pool));
 }
 
+static void
+test_thread_idle_time_entry_func (gpointer data, gpointer user_data)
+{
+  guint thread_id;
+  
+  thread_id = GPOINTER_TO_UINT (data);
+
+  debug("[idle] ---> entered thread:%2.2d\n", 
+       thread_id);
+
+  g_usleep (WAIT * 1000);
+
+  debug("[idle] <--- exiting thread:%2.2d\n", 
+       thread_id);
+}
+
+static gboolean 
+test_thread_idle_timeout (gpointer data)
+{
+  guint interval;
+  gint i;
+
+  interval = GPOINTER_TO_UINT (data);
+  
+  for (i = 0; i < 2; i++) {
+    g_thread_pool_push (idle_pool, GUINT_TO_POINTER (100 + i), NULL); 
+    debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n",
+         100 + i,
+         g_thread_pool_get_num_threads (idle_pool),
+         g_thread_pool_unprocessed (idle_pool));
+  }
+  
+
+  return FALSE;
+}
+
+static void
+test_thread_idle_time (guint idle_time)
+{
+  guint limit = 50;
+  guint interval = 10000;
+  gint i;
+
+  idle_pool = g_thread_pool_new (test_thread_idle_time_entry_func, 
+                                NULL, 
+                                MAX_THREADS,
+                                FALSE,
+                                NULL);
+
+  g_thread_pool_set_max_unused_threads (MAX_UNUSED_THREADS);  
+  g_thread_pool_set_max_idle_time (interval); 
+
+  g_assert (g_thread_pool_get_max_unused_threads () == MAX_UNUSED_THREADS);   
+  g_assert (g_thread_pool_get_max_idle_time () == interval);
+
+  for (i = 0; i < limit; i++) {
+    g_thread_pool_push (idle_pool, GUINT_TO_POINTER (i), NULL); 
+    debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n",
+         i,
+         g_thread_pool_get_num_threads (idle_pool),
+         g_thread_pool_unprocessed (idle_pool));
+  }
+
+  g_timeout_add ((interval - 1000),
+                test_thread_idle_timeout, 
+                GUINT_TO_POINTER (interval));
+}
+
 static gboolean
 test_check_start_and_stop (gpointer user_data)
 {
@@ -173,7 +246,7 @@ test_check_start_and_stop (gpointer user_data)
 
   if (test_number == 0) {
     run_next = TRUE;
-    d(g_print ("***** RUNNING TEST %2.2d *****\n", test_number)); 
+    debug("***** RUNNING TEST %2.2d *****\n", test_number); 
   }
    
   if (run_next) {
@@ -189,8 +262,11 @@ test_check_start_and_stop (gpointer user_data)
     case 3:
       test_thread_sort (TRUE);  
       break;
+    case 4:
+      test_thread_idle_time (5);   
+      break;
     default:
-      d(g_print ("***** END OF TESTS *****\n")); 
+      debug("***** END OF TESTS *****\n"); 
       g_main_loop_quit (main_loop);
       continue_timeout = FALSE;
       break;
@@ -203,19 +279,28 @@ test_check_start_and_stop (gpointer user_data)
   if (test_number == 1) {
     G_LOCK (thread_counter_pools); 
     quit &= running_thread_counter <= 0;
-    d(g_print ("***** POOL RUNNING THREAD COUNT:%ld\n", 
-              running_thread_counter)); 
+    debug("***** POOL RUNNING THREAD COUNT:%ld\n", 
+         running_thread_counter); 
     G_UNLOCK (thread_counter_pools); 
   }
 
   if (test_number == 2 || test_number == 3) {
     G_LOCK (thread_counter_sort);
     quit &= sort_thread_counter <= 0;
-    d(g_print ("***** POOL SORT THREAD COUNT:%ld\n", 
-              sort_thread_counter)); 
+    debug("***** POOL SORT THREAD COUNT:%ld\n", 
+         sort_thread_counter); 
     G_UNLOCK (thread_counter_sort); 
   }
 
+  if (test_number == 4) {
+    guint idle;
+
+    idle = g_thread_pool_get_num_threads (idle_pool);
+    quit &= idle < 1;
+    debug("***** POOL IDLE THREAD COUNT:%d, UNPROCESSED JOBS:%d\n",
+         idle, g_thread_pool_unprocessed (idle_pool));
+  }    
+
   if (quit) {
     run_next = TRUE;
   }
@@ -232,7 +317,7 @@ main (int argc, char *argv[])
 #if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE)
   g_thread_init (NULL);
 
-  d(g_print ("Starting... (in one second)\n"));
+  debug("Starting... (in one second)\n");
   g_timeout_add (1000, test_check_start_and_stop, NULL); 
   
   main_loop = g_main_loop_new (NULL, FALSE);