From 941faa1ca951e2e6ef39dddc9ef3867bf773625f Mon Sep 17 00:00:00 2001 From: Martyn James Russell Date: Tue, 3 Jan 2006 15:09:52 +0000 Subject: [PATCH] - Added new API g_thread_pool_get_idle_time() and * docs/reference/glib/glib-sections.txt: * glib/glib.symbols: * glib/gthreadpool.[ch]: - Added new API g_thread_pool_get_idle_time() and g_thread_pool_set_idle_time(). (#324228). * tests/threadpool-test.c: - Updated test case to do thread pool sorting, thread pool with no sorting and a thread pool with idle thread timeouts. --- ChangeLog | 12 +++ ChangeLog.pre-2-10 | 12 +++ ChangeLog.pre-2-12 | 12 +++ docs/reference/glib/glib-sections.txt | 2 + glib/glib.symbols | 2 + glib/gthreadpool.c | 174 +++++++++++++++++++++++++++++----- glib/gthreadpool.h | 4 + tests/threadpool-test.c | 109 ++++++++++++++++++--- 8 files changed, 292 insertions(+), 35 deletions(-) diff --git a/ChangeLog b/ChangeLog index 5bbda20..dfa0a53 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,15 @@ +2006-01-03 Martyn Russell + + * docs/reference/glib/glib-sections.txt: + * glib/glib.symbols: + * glib/gthreadpool.[ch]: + - Added new API g_thread_pool_get_idle_time() and + g_thread_pool_set_idle_time(). (#324228). + + * tests/threadpool-test.c: + - Updated test case to do thread pool sorting, thread pool with + no sorting and a thread pool with idle thread timeouts. + 2006-01-03 Matthias Clasen * glib/gmain.h: Add new functions here, too. diff --git a/ChangeLog.pre-2-10 b/ChangeLog.pre-2-10 index 5bbda20..dfa0a53 100644 --- a/ChangeLog.pre-2-10 +++ b/ChangeLog.pre-2-10 @@ -1,3 +1,15 @@ +2006-01-03 Martyn Russell + + * docs/reference/glib/glib-sections.txt: + * glib/glib.symbols: + * glib/gthreadpool.[ch]: + - Added new API g_thread_pool_get_idle_time() and + g_thread_pool_set_idle_time(). (#324228). + + * tests/threadpool-test.c: + - Updated test case to do thread pool sorting, thread pool with + no sorting and a thread pool with idle thread timeouts. + 2006-01-03 Matthias Clasen * glib/gmain.h: Add new functions here, too. diff --git a/ChangeLog.pre-2-12 b/ChangeLog.pre-2-12 index 5bbda20..dfa0a53 100644 --- a/ChangeLog.pre-2-12 +++ b/ChangeLog.pre-2-12 @@ -1,3 +1,15 @@ +2006-01-03 Martyn Russell + + * docs/reference/glib/glib-sections.txt: + * glib/glib.symbols: + * glib/gthreadpool.[ch]: + - Added new API g_thread_pool_get_idle_time() and + g_thread_pool_set_idle_time(). (#324228). + + * tests/threadpool-test.c: + - Updated test case to do thread pool sorting, thread pool with + no sorting and a thread pool with idle thread timeouts. + 2006-01-03 Matthias Clasen * glib/gmain.h: Add new functions here, too. diff --git a/docs/reference/glib/glib-sections.txt b/docs/reference/glib/glib-sections.txt index dbffc68..46aa8d7 100644 --- a/docs/reference/glib/glib-sections.txt +++ b/docs/reference/glib/glib-sections.txt @@ -639,6 +639,8 @@ g_thread_pool_get_max_unused_threads g_thread_pool_get_num_unused_threads g_thread_pool_stop_unused_threads g_thread_pool_set_sort_function +g_thread_pool_set_max_idle_time +g_thread_pool_get_max_idle_time
diff --git a/glib/glib.symbols b/glib/glib.symbols index eaa1803..a778c67 100644 --- a/glib/glib.symbols +++ b/glib/glib.symbols @@ -1087,12 +1087,14 @@ g_thread_foreach g_thread_pool_free g_thread_pool_get_max_threads g_thread_pool_get_max_unused_threads +g_thread_pool_get_max_idle_time g_thread_pool_get_num_threads g_thread_pool_get_num_unused_threads g_thread_pool_new g_thread_pool_push g_thread_pool_set_max_threads g_thread_pool_set_max_unused_threads +g_thread_pool_set_max_idle_time g_thread_pool_stop_unused_threads g_thread_pool_unprocessed g_thread_pool_set_sort_function diff --git a/glib/gthreadpool.c b/glib/gthreadpool.c index 5aa05f7..9cf7408 100644 --- a/glib/gthreadpool.c +++ b/glib/gthreadpool.c @@ -29,6 +29,7 @@ #include "glib.h" #include "galias.h" +#define debug(...) /* g_printerr (__VA_ARGS__) */ typedef struct _GRealThreadPool GRealThreadPool; @@ -54,7 +55,8 @@ static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new; static GAsyncQueue *unused_thread_queue; static gint unused_threads = 0; static gint max_unused_threads = 0; -G_LOCK_DEFINE_STATIC (unused_threads); +static guint max_idle_time = 0; +G_LOCK_DEFINE_STATIC (settings); static GMutex *inform_mutex = NULL; static GCond *inform_cond = NULL; @@ -92,13 +94,15 @@ g_thread_pool_thread_proxy (gpointer data) GRealThreadPool *pool = data; gboolean watcher = FALSE; + debug("pool:0x%.8x entering proxy ...\n", (guint)pool); + g_async_queue_lock (pool->queue); while (TRUE) { - gpointer task; + gpointer task = NULL; gboolean goto_global_pool = !pool->pool.exclusive; gint len = g_async_queue_length_unlocked (pool->queue); - + if (g_thread_should_run (pool, len)) { if (watcher) @@ -111,11 +115,38 @@ g_thread_pool_thread_proxy (gpointer data) GTimeVal end_time; g_get_current_time (&end_time); g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */ + debug("pool:0x%.8x waiting 1/2 second to pop next item " + "in queue (%d running, %d unprocessed) ...\n", + (guint)pool, + pool->num_threads, + g_async_queue_length_unlocked (pool->queue)); task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time); } + else if (g_thread_pool_get_max_idle_time() > 0) + { + /* We always give a maximum time to pop the next task so + * we know that when we evaluate task further down, that + * it has had the maximum time to get a new task and it + * can die */ + GTimeVal end_time; + g_get_current_time (&end_time); + debug("pool:0x%.8x waiting %d ms max to pop next item in " + "queue (%d running, %d unprocessed) or exiting ...\n", + (guint)pool, + g_thread_pool_get_max_idle_time (), + pool->num_threads, + g_async_queue_length_unlocked (pool->queue)); + + g_time_val_add (&end_time, g_thread_pool_get_max_idle_time () * 1000); + task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time); + } else - task = g_async_queue_pop_unlocked (pool->queue); - + { + task = g_async_queue_pop_unlocked (pool->queue); + debug("pool:0x%.8x new task:0x%.8x poped from pool queue ...\n", + (guint)pool, (guint)task); + } + if (task) { watcher = FALSE; @@ -125,19 +156,49 @@ g_thread_pool_thread_proxy (gpointer data) * the global pool and just hand the data further to * the next one waiting in the queue */ { + debug("pool:0x%.8x, task:0x%.8x we have too many threads " + "and max is set, pushing task into queue ...\n", + (guint)pool, (guint)task); g_thread_pool_queue_push_unlocked (pool, task); goto_global_pool = TRUE; } else if (pool->running || !pool->immediate) { g_async_queue_unlock (pool->queue); + debug("pool:0x%.8x, task:0x%.8x calling func ...\n", + (guint)pool, (guint)task); pool->pool.func (task, pool->pool.user_data); g_async_queue_lock (pool->queue); } + } + else if (g_thread_pool_get_max_idle_time() > 0) + { + G_LOCK (settings); + if (pool->num_threads > max_unused_threads) { + G_UNLOCK (settings); + pool->num_threads--; + + debug("pool:0x%.8x queue timed pop has no tasks waiting, " + "so stopping thread (%d running, %d unprocessed) ...\n", + (guint)pool, + pool->num_threads, + g_async_queue_length_unlocked (pool->queue)); + g_async_queue_unlock (pool->queue); + + return NULL; + } + G_UNLOCK (settings); } len = g_async_queue_length_unlocked (pool->queue); } + debug("pool:0x%.8x, len:%d, watcher:%s, exclusive:%s, should run:%s\n", + (guint)pool, + len, + watcher ? "true" : "false", + pool->pool.exclusive ? "true" : "false", + g_thread_should_run (pool, len) ? "true" : "false"); + if (!g_thread_should_run (pool, len)) { g_cond_broadcast (inform_cond); @@ -148,6 +209,11 @@ g_thread_pool_thread_proxy (gpointer data) /* At this pool there are no threads waiting, but tasks are. */ goto_global_pool = FALSE; } + else if (len < 1 && g_thread_pool_get_max_idle_time () > 0) + { + goto_global_pool = FALSE; + watcher = FALSE; + } else if (len == 0 && !watcher && !pool->pool.exclusive) { /* Here neither threads nor tasks are queued and we didn't @@ -156,10 +222,11 @@ g_thread_pool_thread_proxy (gpointer data) * switches. */ goto_global_pool = FALSE; watcher = TRUE; - } + } if (goto_global_pool) { + debug("pool:0x%.8x, now in the global pool\n", (guint)pool); pool->num_threads--; if (!pool->running && !pool->waiting) @@ -182,23 +249,27 @@ g_thread_pool_thread_proxy (gpointer data) g_async_queue_lock (unused_thread_queue); - G_LOCK (unused_threads); - if ((unused_threads >= max_unused_threads && + G_LOCK (settings); + if ((unused_threads >= max_unused_threads && max_unused_threads != -1)) { - G_UNLOCK (unused_threads); + G_UNLOCK (settings); g_async_queue_unlock (unused_thread_queue); + debug("pool:0x%.8x stopping thread (%d running, %d unprocessed) ...\n", + (guint)pool, + pool->num_threads, + g_async_queue_length_unlocked (pool->queue)); /* Stop this thread */ return NULL; } unused_threads++; - G_UNLOCK (unused_threads); + G_UNLOCK (settings); pool = g_async_queue_pop_unlocked (unused_thread_queue); - G_LOCK (unused_threads); + G_LOCK (settings); unused_threads--; - G_UNLOCK (unused_threads); + G_UNLOCK (settings); g_async_queue_unlock (unused_thread_queue); @@ -252,6 +323,8 @@ g_thread_pool_start_thread (GRealThreadPool *pool, /* See comment in g_thread_pool_thread_proxy as to why this is done * here and not there */ pool->num_threads++; + debug("pool:0x%.8x thread created, (running:%d)\n", + (guint)pool, pool->num_threads); } /** @@ -637,7 +710,7 @@ g_thread_pool_set_max_unused_threads (gint max_threads) { g_return_if_fail (max_threads >= -1); - G_LOCK (unused_threads); + G_LOCK (settings); max_unused_threads = max_threads; @@ -652,7 +725,7 @@ g_thread_pool_set_max_unused_threads (gint max_threads) g_async_queue_unlock (unused_thread_queue); } - G_UNLOCK (unused_threads); + G_UNLOCK (settings); } /** @@ -667,9 +740,9 @@ g_thread_pool_get_max_unused_threads (void) { gint retval; - G_LOCK (unused_threads); + G_LOCK (settings); retval = max_unused_threads; - G_UNLOCK (unused_threads); + G_UNLOCK (settings); return retval; } @@ -681,13 +754,14 @@ g_thread_pool_get_max_unused_threads (void) * * Return value: the number of currently unused threads **/ -guint g_thread_pool_get_num_unused_threads (void) +guint +g_thread_pool_get_num_unused_threads (void) { guint retval; - G_LOCK (unused_threads); + G_LOCK (settings); retval = unused_threads; - G_UNLOCK (unused_threads); + G_UNLOCK (settings); return retval; } @@ -699,7 +773,8 @@ guint g_thread_pool_get_num_unused_threads (void) * maximal number of unused threads. This function can be used to * regularly stop all unused threads e.g. from g_timeout_add(). **/ -void g_thread_pool_stop_unused_threads (void) +void +g_thread_pool_stop_unused_threads (void) { guint oldval = g_thread_pool_get_max_unused_threads (); g_thread_pool_set_max_unused_threads (0); @@ -723,9 +798,10 @@ void g_thread_pool_stop_unused_threads (void) * * Since: 2.10 **/ -void g_thread_pool_set_sort_function (GThreadPool *pool, - GCompareDataFunc func, - gpointer user_data) +void +g_thread_pool_set_sort_function (GThreadPool *pool, + GCompareDataFunc func, + gpointer user_data) { GRealThreadPool *real = (GRealThreadPool*) pool; @@ -745,5 +821,57 @@ void g_thread_pool_set_sort_function (GThreadPool *pool, g_async_queue_unlock (real->queue); } +/** + * g_thread_pool_set_max_idle_time: + * @interval: the maximum @interval (1/1000ths of a second) a thread + * can be idle. + * + * This function will set the maximum @interval that a thread waiting + * in the pool for new tasks can be idle for before being + * stopped. This function is similar to calling + * g_thread_pool_stop_unused_threads() on a regular timeout, except, + * this is done on a per thread basis. + * + * By setting @interval to 0, idle threads will not be stopped. + * + * This function makes use of g_async_queue_timed_pop () using + * @interval. + * + * Since: 2.10 + **/ +void +g_thread_pool_set_max_idle_time (guint interval) +{ + G_LOCK (settings); + max_idle_time = interval; + G_UNLOCK (settings); +} + +/** + * g_thread_pool_get_max_idle_time: + * + * This function will return the maximum @interval that a thread will + * wait in the thread pool for new tasks before being stopped. + * + * If this function returns 0, threads waiting in the thread pool for + * new work are not stopped. + * + * Return value: the maximum @interval to wait for new tasks in the + * thread pool before stopping the thread (1/1000ths of a second). + * + * Since: 2.10 + **/ +guint +g_thread_pool_get_max_idle_time (void) +{ + guint retval; + + G_LOCK (settings); + retval = max_idle_time; + G_UNLOCK (settings); + + return retval; +} + #define __G_THREADPOOL_C__ #include "galiasdef.c" diff --git a/glib/gthreadpool.h b/glib/gthreadpool.h index e0bb146..371b9ef 100644 --- a/glib/gthreadpool.h +++ b/glib/gthreadpool.h @@ -101,6 +101,10 @@ void g_thread_pool_set_sort_function (GThreadPool *pool, GCompareDataFunc func, gpointer user_data); +/* Set maximum time a thread can be idle in the pool before it is stopped */ +void g_thread_pool_set_max_idle_time (guint interval); +guint g_thread_pool_get_max_idle_time (void); + G_END_DECLS #endif /* __G_THREADPOOL_H__ */ diff --git a/tests/threadpool-test.c b/tests/threadpool-test.c index 0168561..d5749a8 100644 --- a/tests/threadpool-test.c +++ b/tests/threadpool-test.c @@ -5,13 +5,17 @@ #include -#define d(x) x +#define debug(...) g_printerr (__VA_ARGS__) #define RUNS 100 #define WAIT 5 /* seconds */ #define MAX_THREADS 10 -#define MAX_UNUSED_THREADS 2 + +/* if > 0 the test will run continously (since the test ends when + * thread count is 0), if -1 it means no limit to unused threads + * if 0 then no unused threads are possible */ +#define MAX_UNUSED_THREADS -1 G_LOCK_DEFINE_STATIC (thread_counter_pools); @@ -27,6 +31,7 @@ G_LOCK_DEFINE_STATIC (thread_counter_sort); static gulong sort_thread_counter = 0; +static GThreadPool *idle_pool = NULL; static GMainLoop *main_loop = NULL; @@ -38,7 +43,7 @@ test_thread_pools_entry_func (gpointer data, gpointer user_data) id = GPOINTER_TO_UINT (data); - d(g_print ("[pool] ---> [%3.3d] entered thread\n", id)); + debug("[pool] ---> [%3.3d] entered thread\n", id); G_LOCK (thread_counter_pools); abs_thread_counter++; @@ -104,8 +109,8 @@ test_thread_sort_entry_func (gpointer data, gpointer user_data) thread_id = GPOINTER_TO_UINT (data); is_sorted = GPOINTER_TO_INT (user_data); - d(g_print ("%s ---> entered thread:%2.2d, last thread:%2.2d\n", - is_sorted ? "[ sorted]" : "[unsorted]", thread_id, last_thread_id)); + debug("%s ---> entered thread:%2.2d, last thread:%2.2d\n", + is_sorted ? "[ sorted]" : "[unsorted]", thread_id, last_thread_id); if (is_sorted) { static gboolean last_failed = FALSE; @@ -163,6 +168,74 @@ test_thread_sort (gboolean sort) g_assert (g_thread_pool_get_num_threads (pool) == g_thread_pool_get_max_threads (pool)); } +static void +test_thread_idle_time_entry_func (gpointer data, gpointer user_data) +{ + guint thread_id; + + thread_id = GPOINTER_TO_UINT (data); + + debug("[idle] ---> entered thread:%2.2d\n", + thread_id); + + g_usleep (WAIT * 1000); + + debug("[idle] <--- exiting thread:%2.2d\n", + thread_id); +} + +static gboolean +test_thread_idle_timeout (gpointer data) +{ + guint interval; + gint i; + + interval = GPOINTER_TO_UINT (data); + + for (i = 0; i < 2; i++) { + g_thread_pool_push (idle_pool, GUINT_TO_POINTER (100 + i), NULL); + debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n", + 100 + i, + g_thread_pool_get_num_threads (idle_pool), + g_thread_pool_unprocessed (idle_pool)); + } + + + return FALSE; +} + +static void +test_thread_idle_time (guint idle_time) +{ + guint limit = 50; + guint interval = 10000; + gint i; + + idle_pool = g_thread_pool_new (test_thread_idle_time_entry_func, + NULL, + MAX_THREADS, + FALSE, + NULL); + + g_thread_pool_set_max_unused_threads (MAX_UNUSED_THREADS); + g_thread_pool_set_max_idle_time (interval); + + g_assert (g_thread_pool_get_max_unused_threads () == MAX_UNUSED_THREADS); + g_assert (g_thread_pool_get_max_idle_time () == interval); + + for (i = 0; i < limit; i++) { + g_thread_pool_push (idle_pool, GUINT_TO_POINTER (i), NULL); + debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n", + i, + g_thread_pool_get_num_threads (idle_pool), + g_thread_pool_unprocessed (idle_pool)); + } + + g_timeout_add ((interval - 1000), + test_thread_idle_timeout, + GUINT_TO_POINTER (interval)); +} + static gboolean test_check_start_and_stop (gpointer user_data) { @@ -173,7 +246,7 @@ test_check_start_and_stop (gpointer user_data) if (test_number == 0) { run_next = TRUE; - d(g_print ("***** RUNNING TEST %2.2d *****\n", test_number)); + debug("***** RUNNING TEST %2.2d *****\n", test_number); } if (run_next) { @@ -189,8 +262,11 @@ test_check_start_and_stop (gpointer user_data) case 3: test_thread_sort (TRUE); break; + case 4: + test_thread_idle_time (5); + break; default: - d(g_print ("***** END OF TESTS *****\n")); + debug("***** END OF TESTS *****\n"); g_main_loop_quit (main_loop); continue_timeout = FALSE; break; @@ -203,19 +279,28 @@ test_check_start_and_stop (gpointer user_data) if (test_number == 1) { G_LOCK (thread_counter_pools); quit &= running_thread_counter <= 0; - d(g_print ("***** POOL RUNNING THREAD COUNT:%ld\n", - running_thread_counter)); + debug("***** POOL RUNNING THREAD COUNT:%ld\n", + running_thread_counter); G_UNLOCK (thread_counter_pools); } if (test_number == 2 || test_number == 3) { G_LOCK (thread_counter_sort); quit &= sort_thread_counter <= 0; - d(g_print ("***** POOL SORT THREAD COUNT:%ld\n", - sort_thread_counter)); + debug("***** POOL SORT THREAD COUNT:%ld\n", + sort_thread_counter); G_UNLOCK (thread_counter_sort); } + if (test_number == 4) { + guint idle; + + idle = g_thread_pool_get_num_threads (idle_pool); + quit &= idle < 1; + debug("***** POOL IDLE THREAD COUNT:%d, UNPROCESSED JOBS:%d\n", + idle, g_thread_pool_unprocessed (idle_pool)); + } + if (quit) { run_next = TRUE; } @@ -232,7 +317,7 @@ main (int argc, char *argv[]) #if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE) g_thread_init (NULL); - d(g_print ("Starting... (in one second)\n")); + debug("Starting... (in one second)\n"); g_timeout_add (1000, test_check_start_and_stop, NULL); main_loop = g_main_loop_new (NULL, FALSE); -- 2.7.4