#include "glib.h"
#include "galias.h"
+#define debug(...) /* g_printerr (__VA_ARGS__) */
typedef struct _GRealThreadPool GRealThreadPool;
static GAsyncQueue *unused_thread_queue;
static gint unused_threads = 0;
static gint max_unused_threads = 0;
-G_LOCK_DEFINE_STATIC (unused_threads);
+static guint max_idle_time = 0;
+G_LOCK_DEFINE_STATIC (settings);
static GMutex *inform_mutex = NULL;
static GCond *inform_cond = NULL;
GRealThreadPool *pool = data;
gboolean watcher = FALSE;
+ debug("pool:0x%.8x entering proxy ...\n", (guint)pool);
+
g_async_queue_lock (pool->queue);
while (TRUE)
{
- gpointer task;
+ gpointer task = NULL;
gboolean goto_global_pool = !pool->pool.exclusive;
gint len = g_async_queue_length_unlocked (pool->queue);
-
+
if (g_thread_should_run (pool, len))
{
if (watcher)
GTimeVal end_time;
g_get_current_time (&end_time);
g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */
+ debug("pool:0x%.8x waiting 1/2 second to pop next item "
+ "in queue (%d running, %d unprocessed) ...\n",
+ (guint)pool,
+ pool->num_threads,
+ g_async_queue_length_unlocked (pool->queue));
task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
}
+ else if (g_thread_pool_get_max_idle_time() > 0)
+ {
+ /* We always give a maximum time to pop the next task so
+ * we know that when we evaluate task further down, that
+ * it has had the maximum time to get a new task and it
+ * can die */
+ GTimeVal end_time;
+ g_get_current_time (&end_time);
+ debug("pool:0x%.8x waiting %d ms max to pop next item in "
+ "queue (%d running, %d unprocessed) or exiting ...\n",
+ (guint)pool,
+ g_thread_pool_get_max_idle_time (),
+ pool->num_threads,
+ g_async_queue_length_unlocked (pool->queue));
+
+ g_time_val_add (&end_time, g_thread_pool_get_max_idle_time () * 1000);
+ task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
+ }
else
- task = g_async_queue_pop_unlocked (pool->queue);
-
+ {
+ task = g_async_queue_pop_unlocked (pool->queue);
+ debug("pool:0x%.8x new task:0x%.8x poped from pool queue ...\n",
+ (guint)pool, (guint)task);
+ }
+
if (task)
{
watcher = FALSE;
* the global pool and just hand the data further to
* the next one waiting in the queue */
{
+ debug("pool:0x%.8x, task:0x%.8x we have too many threads "
+ "and max is set, pushing task into queue ...\n",
+ (guint)pool, (guint)task);
g_thread_pool_queue_push_unlocked (pool, task);
goto_global_pool = TRUE;
}
else if (pool->running || !pool->immediate)
{
g_async_queue_unlock (pool->queue);
+ debug("pool:0x%.8x, task:0x%.8x calling func ...\n",
+ (guint)pool, (guint)task);
pool->pool.func (task, pool->pool.user_data);
g_async_queue_lock (pool->queue);
}
+ }
+ else if (g_thread_pool_get_max_idle_time() > 0)
+ {
+ G_LOCK (settings);
+ if (pool->num_threads > max_unused_threads) {
+ G_UNLOCK (settings);
+ pool->num_threads--;
+
+ debug("pool:0x%.8x queue timed pop has no tasks waiting, "
+ "so stopping thread (%d running, %d unprocessed) ...\n",
+ (guint)pool,
+ pool->num_threads,
+ g_async_queue_length_unlocked (pool->queue));
+ g_async_queue_unlock (pool->queue);
+
+ return NULL;
+ }
+ G_UNLOCK (settings);
}
len = g_async_queue_length_unlocked (pool->queue);
}
+ debug("pool:0x%.8x, len:%d, watcher:%s, exclusive:%s, should run:%s\n",
+ (guint)pool,
+ len,
+ watcher ? "true" : "false",
+ pool->pool.exclusive ? "true" : "false",
+ g_thread_should_run (pool, len) ? "true" : "false");
+
if (!g_thread_should_run (pool, len))
{
g_cond_broadcast (inform_cond);
/* At this pool there are no threads waiting, but tasks are. */
goto_global_pool = FALSE;
}
+ else if (len < 1 && g_thread_pool_get_max_idle_time () > 0)
+ {
+ goto_global_pool = FALSE;
+ watcher = FALSE;
+ }
else if (len == 0 && !watcher && !pool->pool.exclusive)
{
/* Here neither threads nor tasks are queued and we didn't
* switches. */
goto_global_pool = FALSE;
watcher = TRUE;
- }
+ }
if (goto_global_pool)
{
+ debug("pool:0x%.8x, now in the global pool\n", (guint)pool);
pool->num_threads--;
if (!pool->running && !pool->waiting)
g_async_queue_lock (unused_thread_queue);
- G_LOCK (unused_threads);
- if ((unused_threads >= max_unused_threads &&
+ G_LOCK (settings);
+ if ((unused_threads >= max_unused_threads &&
max_unused_threads != -1))
{
- G_UNLOCK (unused_threads);
+ G_UNLOCK (settings);
g_async_queue_unlock (unused_thread_queue);
+ debug("pool:0x%.8x stopping thread (%d running, %d unprocessed) ...\n",
+ (guint)pool,
+ pool->num_threads,
+ g_async_queue_length_unlocked (pool->queue));
/* Stop this thread */
return NULL;
}
unused_threads++;
- G_UNLOCK (unused_threads);
+ G_UNLOCK (settings);
pool = g_async_queue_pop_unlocked (unused_thread_queue);
- G_LOCK (unused_threads);
+ G_LOCK (settings);
unused_threads--;
- G_UNLOCK (unused_threads);
+ G_UNLOCK (settings);
g_async_queue_unlock (unused_thread_queue);
/* See comment in g_thread_pool_thread_proxy as to why this is done
* here and not there */
pool->num_threads++;
+ debug("pool:0x%.8x thread created, (running:%d)\n",
+ (guint)pool, pool->num_threads);
}
/**
{
g_return_if_fail (max_threads >= -1);
- G_LOCK (unused_threads);
+ G_LOCK (settings);
max_unused_threads = max_threads;
g_async_queue_unlock (unused_thread_queue);
}
- G_UNLOCK (unused_threads);
+ G_UNLOCK (settings);
}
/**
{
gint retval;
- G_LOCK (unused_threads);
+ G_LOCK (settings);
retval = max_unused_threads;
- G_UNLOCK (unused_threads);
+ G_UNLOCK (settings);
return retval;
}
*
* Return value: the number of currently unused threads
**/
-guint g_thread_pool_get_num_unused_threads (void)
+guint
+g_thread_pool_get_num_unused_threads (void)
{
guint retval;
- G_LOCK (unused_threads);
+ G_LOCK (settings);
retval = unused_threads;
- G_UNLOCK (unused_threads);
+ G_UNLOCK (settings);
return retval;
}
* maximal number of unused threads. This function can be used to
* regularly stop all unused threads e.g. from g_timeout_add().
**/
-void g_thread_pool_stop_unused_threads (void)
+void
+g_thread_pool_stop_unused_threads (void)
{
guint oldval = g_thread_pool_get_max_unused_threads ();
g_thread_pool_set_max_unused_threads (0);
*
* Since: 2.10
**/
-void g_thread_pool_set_sort_function (GThreadPool *pool,
- GCompareDataFunc func,
- gpointer user_data)
+void
+g_thread_pool_set_sort_function (GThreadPool *pool,
+ GCompareDataFunc func,
+ gpointer user_data)
{
GRealThreadPool *real = (GRealThreadPool*) pool;
g_async_queue_unlock (real->queue);
}
+/**
+ * g_thread_pool_set_max_idle_time:
+ * @interval: the maximum @interval (1/1000ths of a second) a thread
+ * can be idle.
+ *
+ * This function will set the maximum @interval that a thread waiting
+ * in the pool for new tasks can be idle for before being
+ * stopped. This function is similar to calling
+ * g_thread_pool_stop_unused_threads() on a regular timeout, except,
+ * this is done on a per thread basis.
+ *
+ * By setting @interval to 0, idle threads will not be stopped.
+ *
+ * This function makes use of g_async_queue_timed_pop () using
+ * @interval.
+ *
+ * Since: 2.10
+ **/
+void
+g_thread_pool_set_max_idle_time (guint interval)
+{
+ G_LOCK (settings);
+ max_idle_time = interval;
+ G_UNLOCK (settings);
+}
+
+/**
+ * g_thread_pool_get_max_idle_time:
+ *
+ * This function will return the maximum @interval that a thread will
+ * wait in the thread pool for new tasks before being stopped.
+ *
+ * If this function returns 0, threads waiting in the thread pool for
+ * new work are not stopped.
+ *
+ * Return value: the maximum @interval to wait for new tasks in the
+ * thread pool before stopping the thread (1/1000ths of a second).
+ *
+ * Since: 2.10
+ **/
+guint
+g_thread_pool_get_max_idle_time (void)
+{
+ guint retval;
+
+ G_LOCK (settings);
+ retval = max_idle_time;
+ G_UNLOCK (settings);
+
+ return retval;
+}
+
#define __G_THREADPOOL_C__
#include "galiasdef.c"
#include <glib.h>
-#define d(x) x
+#define debug(...) g_printerr (__VA_ARGS__)
#define RUNS 100
#define WAIT 5 /* seconds */
#define MAX_THREADS 10
-#define MAX_UNUSED_THREADS 2
+
+/* if > 0 the test will run continously (since the test ends when
+ * thread count is 0), if -1 it means no limit to unused threads
+ * if 0 then no unused threads are possible */
+#define MAX_UNUSED_THREADS -1
G_LOCK_DEFINE_STATIC (thread_counter_pools);
static gulong sort_thread_counter = 0;
+static GThreadPool *idle_pool = NULL;
static GMainLoop *main_loop = NULL;
id = GPOINTER_TO_UINT (data);
- d(g_print ("[pool] ---> [%3.3d] entered thread\n", id));
+ debug("[pool] ---> [%3.3d] entered thread\n", id);
G_LOCK (thread_counter_pools);
abs_thread_counter++;
thread_id = GPOINTER_TO_UINT (data);
is_sorted = GPOINTER_TO_INT (user_data);
- d(g_print ("%s ---> entered thread:%2.2d, last thread:%2.2d\n",
- is_sorted ? "[ sorted]" : "[unsorted]", thread_id, last_thread_id));
+ debug("%s ---> entered thread:%2.2d, last thread:%2.2d\n",
+ is_sorted ? "[ sorted]" : "[unsorted]", thread_id, last_thread_id);
if (is_sorted) {
static gboolean last_failed = FALSE;
g_assert (g_thread_pool_get_num_threads (pool) == g_thread_pool_get_max_threads (pool));
}
+static void
+test_thread_idle_time_entry_func (gpointer data, gpointer user_data)
+{
+ guint thread_id;
+
+ thread_id = GPOINTER_TO_UINT (data);
+
+ debug("[idle] ---> entered thread:%2.2d\n",
+ thread_id);
+
+ g_usleep (WAIT * 1000);
+
+ debug("[idle] <--- exiting thread:%2.2d\n",
+ thread_id);
+}
+
+static gboolean
+test_thread_idle_timeout (gpointer data)
+{
+ guint interval;
+ gint i;
+
+ interval = GPOINTER_TO_UINT (data);
+
+ for (i = 0; i < 2; i++) {
+ g_thread_pool_push (idle_pool, GUINT_TO_POINTER (100 + i), NULL);
+ debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n",
+ 100 + i,
+ g_thread_pool_get_num_threads (idle_pool),
+ g_thread_pool_unprocessed (idle_pool));
+ }
+
+
+ return FALSE;
+}
+
+static void
+test_thread_idle_time (guint idle_time)
+{
+ guint limit = 50;
+ guint interval = 10000;
+ gint i;
+
+ idle_pool = g_thread_pool_new (test_thread_idle_time_entry_func,
+ NULL,
+ MAX_THREADS,
+ FALSE,
+ NULL);
+
+ g_thread_pool_set_max_unused_threads (MAX_UNUSED_THREADS);
+ g_thread_pool_set_max_idle_time (interval);
+
+ g_assert (g_thread_pool_get_max_unused_threads () == MAX_UNUSED_THREADS);
+ g_assert (g_thread_pool_get_max_idle_time () == interval);
+
+ for (i = 0; i < limit; i++) {
+ g_thread_pool_push (idle_pool, GUINT_TO_POINTER (i), NULL);
+ debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n",
+ i,
+ g_thread_pool_get_num_threads (idle_pool),
+ g_thread_pool_unprocessed (idle_pool));
+ }
+
+ g_timeout_add ((interval - 1000),
+ test_thread_idle_timeout,
+ GUINT_TO_POINTER (interval));
+}
+
static gboolean
test_check_start_and_stop (gpointer user_data)
{
if (test_number == 0) {
run_next = TRUE;
- d(g_print ("***** RUNNING TEST %2.2d *****\n", test_number));
+ debug("***** RUNNING TEST %2.2d *****\n", test_number);
}
if (run_next) {
case 3:
test_thread_sort (TRUE);
break;
+ case 4:
+ test_thread_idle_time (5);
+ break;
default:
- d(g_print ("***** END OF TESTS *****\n"));
+ debug("***** END OF TESTS *****\n");
g_main_loop_quit (main_loop);
continue_timeout = FALSE;
break;
if (test_number == 1) {
G_LOCK (thread_counter_pools);
quit &= running_thread_counter <= 0;
- d(g_print ("***** POOL RUNNING THREAD COUNT:%ld\n",
- running_thread_counter));
+ debug("***** POOL RUNNING THREAD COUNT:%ld\n",
+ running_thread_counter);
G_UNLOCK (thread_counter_pools);
}
if (test_number == 2 || test_number == 3) {
G_LOCK (thread_counter_sort);
quit &= sort_thread_counter <= 0;
- d(g_print ("***** POOL SORT THREAD COUNT:%ld\n",
- sort_thread_counter));
+ debug("***** POOL SORT THREAD COUNT:%ld\n",
+ sort_thread_counter);
G_UNLOCK (thread_counter_sort);
}
+ if (test_number == 4) {
+ guint idle;
+
+ idle = g_thread_pool_get_num_threads (idle_pool);
+ quit &= idle < 1;
+ debug("***** POOL IDLE THREAD COUNT:%d, UNPROCESSED JOBS:%d\n",
+ idle, g_thread_pool_unprocessed (idle_pool));
+ }
+
if (quit) {
run_next = TRUE;
}
#if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE)
g_thread_init (NULL);
- d(g_print ("Starting... (in one second)\n"));
+ debug("Starting... (in one second)\n");
g_timeout_add (1000, test_check_start_and_stop, NULL);
main_loop = g_main_loop_new (NULL, FALSE);