#include "glib.h"
#include "galias.h"
-#define DEBUG_MSG(x) /* */
-/* #define DEBUG_MSG(args) g_message args ; */
-
+#define DEBUG_MSG(x)
+/* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n"); */
typedef struct _GRealThreadPool GRealThreadPool;
gpointer sort_user_data;
};
-/* The following is just an address to mark the stop order for a
+/* The following is just an address to mark the wakeup order for a
* thread, it could be any address (as long, as it isn't a valid
* GThreadPool address) */
-static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new;
+static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
+static gint wakeup_thread_serial = 0;
/* Here all unused threads are waiting */
static GAsyncQueue *unused_thread_queue;
static gint unused_threads = 0;
static gint max_unused_threads = 0;
static guint max_idle_time = 0;
-G_LOCK_DEFINE_STATIC (settings);
static GMutex *inform_mutex = NULL;
static GCond *inform_cond = NULL;
static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool);
-#define g_thread_should_run(pool, len) \
- ((pool)->running || (!(pool)->immediate && (len) > 0))
-
-
static void
g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
gpointer data)
g_thread_pool_thread_proxy (gpointer data)
{
GRealThreadPool *pool = data;
- gboolean watcher = FALSE;
+ guint last_wakeup_thread_serial = 0;
- DEBUG_MSG(("pool:0x%.8x entering proxy ...\n", (guint)pool));
+ DEBUG_MSG (("thread %p started for pool %p.",
+ g_thread_self (), pool));
g_async_queue_lock (pool->queue);
while (TRUE)
{
gpointer task = NULL;
- gboolean goto_global_pool = !pool->pool.exclusive;
- gint len = g_async_queue_length_unlocked (pool->queue);
- if (g_thread_should_run (pool, len))
+ if (pool->running || (!pool->immediate &&
+ g_async_queue_length_unlocked (pool->queue) > 0))
{
- if (watcher)
+ /* This thread pool is still active. */
+ if (pool->num_threads > pool->max_threads &&
+ pool->max_threads != -1)
{
- /* This thread is actually not needed here, but it waits
- * for some time anyway. If during that time a new
- * request arrives, this saves process
- * swicthes. Otherwise the thread will go to the global
- * pool afterwards */
- GTimeVal end_time;
- g_get_current_time (&end_time);
- g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */
- DEBUG_MSG(("pool:0x%.8x waiting 1/2 second to pop next item "
- "in queue (%d running, %d unprocessed) ...\n",
- (guint)pool,
- pool->num_threads,
- g_async_queue_length_unlocked (pool->queue)));
- task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
+ /* This is a superfluous thread, so it goes to the
+ * global pool. */
+ DEBUG_MSG (("superfluous thread %p in pool %p.",
+ g_thread_self (), pool));
}
- else if (g_thread_pool_get_max_idle_time() > 0)
+ else if (pool->pool.exclusive)
{
- /* We always give a maximum time to pop the next task so
- * we know that when we evaluate task further down, that
- * it has had the maximum time to get a new task and it
- * can die */
- GTimeVal end_time;
- g_get_current_time (&end_time);
- DEBUG_MSG(("pool:0x%.8x waiting %d ms max to pop next item in "
- "queue (%d running, %d unprocessed) or exiting ...\n",
- (guint)pool,
- g_thread_pool_get_max_idle_time (),
- pool->num_threads,
- g_async_queue_length_unlocked (pool->queue)));
-
- g_time_val_add (&end_time, g_thread_pool_get_max_idle_time () * 1000);
- task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
+ /* Exclusive threads stay attached to the pool. */
+ task = g_async_queue_pop_unlocked (pool->queue);
+ DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
+ "(%d running, %d unprocessed).",
+ g_thread_self (), pool, pool->num_threads,
+ g_async_queue_length_unlocked (pool->queue)));
}
else
{
- task = g_async_queue_pop_unlocked (pool->queue);
- DEBUG_MSG(("pool:0x%.8x new task:0x%.8x poped from pool queue ...\n",
- (guint)pool, (guint)task));
- }
-
- if (task)
- {
- watcher = FALSE;
- if (pool->num_threads > pool->max_threads &&
- pool->max_threads != -1)
- /* We are in fact a superfluous threads, so we go to
- * the global pool and just hand the data further to
- * the next one waiting in the queue */
- {
- DEBUG_MSG(("pool:0x%.8x, task:0x%.8x we have too many threads "
- "and max is set, pushing task into queue ...\n",
- (guint)pool, (guint)task));
- g_thread_pool_queue_push_unlocked (pool, task);
- goto_global_pool = TRUE;
- }
- else if (pool->running || !pool->immediate)
- {
- g_async_queue_unlock (pool->queue);
- DEBUG_MSG(("pool:0x%.8x, task:0x%.8x calling func ...\n",
- (guint)pool, (guint)task));
- pool->pool.func (task, pool->pool.user_data);
- g_async_queue_lock (pool->queue);
- }
- }
- else if (g_thread_pool_get_max_idle_time() > 0)
- {
- G_LOCK (settings);
- if (pool->num_threads > max_unused_threads) {
- G_UNLOCK (settings);
- pool->num_threads--;
-
- DEBUG_MSG(("pool:0x%.8x queue timed pop has no tasks waiting, "
- "so stopping thread (%d running, %d unprocessed) ...\n",
- (guint)pool,
- pool->num_threads,
- g_async_queue_length_unlocked (pool->queue)));
- g_async_queue_unlock (pool->queue);
-
- return NULL;
- }
- G_UNLOCK (settings);
+ /* A thread will wait for new tasks for at most 1/2
+ * second before going to the global pool. */
+ GTimeVal end_time;
+ g_get_current_time (&end_time);
+ g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */
+ DEBUG_MSG (("thread %p in pool %p waits 1/2 second for task "
+ "(%d running, %d unprocessed).",
+ g_thread_self (), pool, pool->num_threads,
+ g_async_queue_length_unlocked (pool->queue)));
+ task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
}
- len = g_async_queue_length_unlocked (pool->queue);
}
-
- DEBUG_MSG(("pool:0x%.8x, len:%d, watcher:%s, exclusive:%s, should run:%s\n",
- (guint)pool,
- len,
- watcher ? "true" : "false",
- pool->pool.exclusive ? "true" : "false",
- g_thread_should_run (pool, len) ? "true" : "false"));
-
- if (!g_thread_should_run (pool, len))
+ else
{
- g_cond_broadcast (inform_cond);
- goto_global_pool = TRUE;
+ /* This thread pool should is inactive, it will no longer
+ * process tasks. */
+ DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
+ "(running: %s, immediate: %s, len: %d).",
+ pool, g_thread_self (),
+ pool->running ? "true" : "false",
+ pool->immediate ? "true" : "false",
+ g_async_queue_length_unlocked (pool->queue)));
}
- else if (len > 0)
- {
- /* At this pool there are no threads waiting, but tasks are. */
- goto_global_pool = FALSE;
- }
- else if (len < 1 && g_thread_pool_get_max_idle_time () > 0)
+
+ if (task)
{
- goto_global_pool = FALSE;
- watcher = FALSE;
+ if (pool->running || !pool->immediate)
+ {
+ /* A task was received and the thread pool is active, so
+ * execute the function. */
+ g_async_queue_unlock (pool->queue);
+ DEBUG_MSG (("thread %p in pool %p calling func.",
+ g_thread_self (), pool));
+ pool->pool.func (task, pool->pool.user_data);
+ g_async_queue_lock (pool->queue);
+ }
}
- else if (len == 0 && !watcher && !pool->pool.exclusive)
+ else
{
- /* Here neither threads nor tasks are queued and we didn't
- * just return from a timed wait. We now wait for a limited
- * time at this pool for new tasks to avoid costly context
- * switches. */
- goto_global_pool = FALSE;
- watcher = TRUE;
- }
-
- if (goto_global_pool)
- {
- DEBUG_MSG(("pool:0x%.8x, now in the global pool\n", (guint)pool));
+ /* No task was received, so this thread goes to the global
+ * pool. */
+ gboolean free_pool = FALSE;
+
+ DEBUG_MSG (("thread %p leaving pool %p for global pool.",
+ g_thread_self (), pool));
pool->num_threads--;
- if (!pool->running && !pool->waiting)
+ if (!pool->running)
{
- if (pool->num_threads == 0)
+ if (!pool->waiting)
{
- g_async_queue_unlock (pool->queue);
- g_thread_pool_free_internal (pool);
- }
- else
+ if (pool->num_threads == 0)
+ {
+ /* If the pool is not running and no other
+ * thread is waiting for this thread pool to
+ * finish and this is the last thread of this
+ * pool, free the pool. */
+ free_pool = TRUE;
+ }
+ else
+ {
+ /* If the pool is not running and no other
+ * thread is waiting for this thread pool to
+ * finish and this is not the last thread of
+ * this pool and there are no tasks left in the
+ * queue, wakeup the remaining threads. */
+ if (g_async_queue_length_unlocked (pool->queue) ==
+ - pool->num_threads)
+ g_thread_pool_wakeup_and_stop_all (pool);
+ }
+ }
+ else if (pool->immediate ||
+ g_async_queue_length_unlocked (pool->queue) <= 0)
{
- if (len == - pool->num_threads)
- g_thread_pool_wakeup_and_stop_all (pool);
-
- g_async_queue_unlock (pool->queue);
+ /* If the pool is not running and another thread is
+ * waiting for this thread pool to finish and there
+ * are either no tasks left or the pool shall stop
+ * immediatly, inform the waiting thread of a change
+ * of the thread pool state. */
+ g_cond_broadcast (inform_cond);
}
}
- else
- g_async_queue_unlock (pool->queue);
+ g_async_queue_unlock (pool->queue);
+
+ if (free_pool)
+ g_thread_pool_free_internal (pool);
g_async_queue_lock (unused_thread_queue);
-
- G_LOCK (settings);
- if ((unused_threads >= max_unused_threads &&
- max_unused_threads != -1))
- {
- G_UNLOCK (settings);
- g_async_queue_unlock (unused_thread_queue);
- DEBUG_MSG(("pool:0x%.8x stopping thread (%d running, %d unprocessed) ...\n",
- (guint)pool,
- pool->num_threads,
- g_async_queue_length_unlocked (pool->queue)));
- /* Stop this thread */
- return NULL;
- }
unused_threads++;
- G_UNLOCK (settings);
- pool = g_async_queue_pop_unlocked (unused_thread_queue);
+ do
+ {
+
+ if ((unused_threads >= max_unused_threads &&
+ max_unused_threads != -1))
+ {
+ /* If this is a superflous thread, stop it. */
+ unused_threads--;
+ g_async_queue_unlock (unused_thread_queue);
+ DEBUG_MSG (("stopping thread %p.", g_thread_self ()));
+ return NULL;
+ }
+
+ if (max_idle_time > 0)
+ {
+ /* If a maximal idle time is given, wait for the
+ * given time. */
+ GTimeVal end_time;
+ g_get_current_time (&end_time);
+ g_time_val_add (&end_time, max_idle_time * 1000);
+
+ DEBUG_MSG (("thread %p waiting in global pool for "
+ "%f seconds.",
+ g_thread_self (), max_idle_time / 1000.0));
+
+ pool = g_async_queue_timed_pop_unlocked (unused_thread_queue,
+ &end_time);
+ if (!pool)
+ {
+ /* If no new task was received in the given
+ * time, stop this thread. */
+ unused_threads--;
+ g_async_queue_unlock (unused_thread_queue);
+ DEBUG_MSG (("stopping thread %p after max-idle-time.",
+ g_thread_self ()));
+ /* Stop this thread */
+ return NULL;
+ }
+ }
+ else
+ {
+ /* If no maximal idle time is given, wait
+ * indefinitly. */
+ DEBUG_MSG (("thread %p waiting in global pool.",
+ g_thread_self ()));
+ pool = g_async_queue_pop_unlocked (unused_thread_queue);
+ }
+
+ if (pool == wakeup_thread_marker)
+ {
+ if (last_wakeup_thread_serial == wakeup_thread_serial)
+ {
+ /* If this wakeup marker has been received for
+ * the second time, relay it. */
+ DEBUG_MSG (("thread %p relaying wakeup message to "
+ "waiting thread with lower serial.",
+ g_thread_self ()));
+ g_async_queue_push_unlocked (unused_thread_queue,
+ wakeup_thread_marker);
+ }
+ else
+ {
+ last_wakeup_thread_serial = wakeup_thread_serial;
+ }
+
+ /* If a wakeup marker has been received, this thread
+ * will get out of the way for 100 microseconds to
+ * avoid receiving this marker again. */
+ g_async_queue_unlock (unused_thread_queue);
+ g_usleep (100);
+ g_async_queue_lock (unused_thread_queue);
+ }
- G_LOCK (settings);
+ } while (pool == wakeup_thread_marker);
+
unused_threads--;
- G_UNLOCK (settings);
-
g_async_queue_unlock (unused_thread_queue);
-
- if (pool == stop_this_thread_marker)
- /* Stop this thread */
- return NULL;
-
+
g_async_queue_lock (pool->queue);
+
+ DEBUG_MSG (("thread %p entering pool %p from global pool.",
+ g_thread_self (), pool));
/* pool->num_threads++ is not done here, but in
* g_thread_pool_start_thread to make the new started thread
/* See comment in g_thread_pool_thread_proxy as to why this is done
* here and not there */
pool->num_threads++;
- DEBUG_MSG(("pool:0x%.8x thread created, (running:%d)\n",
- (guint)pool, pool->num_threads));
}
/**
{
g_return_if_fail (max_threads >= -1);
- G_LOCK (settings);
+ g_async_queue_lock (unused_thread_queue);
max_unused_threads = max_threads;
if (max_unused_threads < unused_threads && max_unused_threads != -1)
{
guint i;
+ wakeup_thread_serial++;
- g_async_queue_lock (unused_thread_queue);
for (i = unused_threads - max_unused_threads; i > 0; i--)
g_async_queue_push_unlocked (unused_thread_queue,
- stop_this_thread_marker);
- g_async_queue_unlock (unused_thread_queue);
+ wakeup_thread_marker);
}
- G_UNLOCK (settings);
+ g_async_queue_unlock (unused_thread_queue);
}
/**
{
gint retval;
- G_LOCK (settings);
+ g_async_queue_lock (unused_thread_queue);
retval = max_unused_threads;
- G_UNLOCK (settings);
+ g_async_queue_unlock (unused_thread_queue);
return retval;
}
{
guint retval;
- G_LOCK (settings);
+ g_async_queue_lock (unused_thread_queue);
retval = unused_threads;
- G_UNLOCK (settings);
+ g_async_queue_unlock (unused_thread_queue);
return retval;
}
void
g_thread_pool_set_max_idle_time (guint interval)
{
- G_LOCK (settings);
- max_idle_time = interval;
- G_UNLOCK (settings);
+ guint i;
+
+ g_async_queue_lock (unused_thread_queue);
+ max_idle_time = interval;
+ wakeup_thread_serial++;
+
+ for (i = 0; i < unused_threads; i++)
+ g_async_queue_push_unlocked (unused_thread_queue, wakeup_thread_marker);
+
+ g_async_queue_unlock (unused_thread_queue);
}
/**
{
guint retval;
- G_LOCK (settings);
+ g_async_queue_lock (unused_thread_queue);
retval = max_idle_time;
- G_UNLOCK (settings);
+ g_async_queue_unlock (unused_thread_queue);
return retval;
}
#include <glib.h>
-#define debug(...) g_printerr (__VA_ARGS__)
+/* #define DEBUG_MSG(x) */
+#define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");
#define RUNS 100
id = GPOINTER_TO_UINT (data);
- debug("[pool] ---> [%3.3d] entered thread\n", id);
+ DEBUG_MSG (("[pool] ---> [%3.3d] entered thread.", id));
G_LOCK (thread_counter_pools);
abs_thread_counter++;
running_thread_counter--;
leftover_task_counter--;
- g_print ("[pool] ---> [%3.3d] exiting thread (abs count:%ld, running count:%ld, left over:%ld)\n",
- id, abs_thread_counter, running_thread_counter, leftover_task_counter);
+ DEBUG_MSG (("[pool] ---> [%3.3d] exiting thread (abs count:%ld, "
+ "running count:%ld, left over:%ld)",
+ id, abs_thread_counter,
+ running_thread_counter, leftover_task_counter));
G_UNLOCK (thread_counter_pools);
}
for (i = 0; i < RUNS; i++)
{
- g_thread_pool_push (pool1, GUINT_TO_POINTER (i), NULL);
- g_thread_pool_push (pool2, GUINT_TO_POINTER (i), NULL);
- g_thread_pool_push (pool3, GUINT_TO_POINTER (i), NULL);
+ g_thread_pool_push (pool1, GUINT_TO_POINTER (i + 1), NULL);
+ g_thread_pool_push (pool2, GUINT_TO_POINTER (i + 1), NULL);
+ g_thread_pool_push (pool3, GUINT_TO_POINTER (i + 1), NULL);
leftover_task_counter += 3;
}
thread_id = GPOINTER_TO_UINT (data);
is_sorted = GPOINTER_TO_INT (user_data);
- debug("%s ---> entered thread:%2.2d, last thread:%2.2d\n",
- is_sorted ? "[ sorted]" : "[unsorted]", thread_id, last_thread_id);
+ DEBUG_MSG (("%s ---> entered thread:%2.2d, last thread:%2.2d",
+ is_sorted ? "[ sorted]" : "[unsorted]",
+ thread_id, last_thread_id));
if (is_sorted) {
static gboolean last_failed = FALSE;
guint id;
id = g_random_int_range (1, limit*2);
- g_thread_pool_push (pool, GUINT_TO_POINTER (id), NULL);
+ g_thread_pool_push (pool, GUINT_TO_POINTER (id + 1), NULL);
}
g_assert (g_thread_pool_get_num_threads (pool) == g_thread_pool_get_max_threads (pool));
thread_id = GPOINTER_TO_UINT (data);
- debug("[idle] ---> entered thread:%2.2d\n",
- thread_id);
+ DEBUG_MSG (("[idle] ---> entered thread:%2.2d", thread_id));
g_usleep (WAIT * 1000);
- debug("[idle] <--- exiting thread:%2.2d\n",
- thread_id);
+ DEBUG_MSG (("[idle] <--- exiting thread:%2.2d", thread_id));
}
static gboolean
for (i = 0; i < 2; i++) {
g_thread_pool_push (idle_pool, GUINT_TO_POINTER (100 + i), NULL);
- debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n",
- 100 + i,
- g_thread_pool_get_num_threads (idle_pool),
- g_thread_pool_unprocessed (idle_pool));
+ DEBUG_MSG (("[idle] ===> pushed new thread with id:%d, number "
+ "of threads:%d, unprocessed:%d",
+ 100 + i,
+ g_thread_pool_get_num_threads (idle_pool),
+ g_thread_pool_unprocessed (idle_pool)));
}
}
static void
-test_thread_idle_time (guint idle_time)
+test_thread_idle_time ()
{
guint limit = 50;
guint interval = 10000;
g_assert (g_thread_pool_get_max_idle_time () == interval);
for (i = 0; i < limit; i++) {
- g_thread_pool_push (idle_pool, GUINT_TO_POINTER (i), NULL);
- debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n",
- i,
- g_thread_pool_get_num_threads (idle_pool),
- g_thread_pool_unprocessed (idle_pool));
+ g_thread_pool_push (idle_pool, GUINT_TO_POINTER (i + 1), NULL);
+ DEBUG_MSG (("[idle] ===> pushed new thread with id:%d, "
+ "number of threads:%d, unprocessed:%d",
+ i,
+ g_thread_pool_get_num_threads (idle_pool),
+ g_thread_pool_unprocessed (idle_pool)));
}
g_timeout_add ((interval - 1000),
if (test_number == 0) {
run_next = TRUE;
- debug("***** RUNNING TEST %2.2d *****\n", test_number);
+ DEBUG_MSG (("***** RUNNING TEST %2.2d *****", test_number));
}
if (run_next) {
test_thread_sort (TRUE);
break;
case 4:
- test_thread_idle_time (5);
+ test_thread_idle_time ();
break;
default:
- debug("***** END OF TESTS *****\n");
+ DEBUG_MSG (("***** END OF TESTS *****"));
g_main_loop_quit (main_loop);
continue_timeout = FALSE;
break;
if (test_number == 1) {
G_LOCK (thread_counter_pools);
quit &= running_thread_counter <= 0;
- debug("***** POOL RUNNING THREAD COUNT:%ld\n",
- running_thread_counter);
+ DEBUG_MSG (("***** POOL RUNNING THREAD COUNT:%ld",
+ running_thread_counter));
G_UNLOCK (thread_counter_pools);
}
if (test_number == 2 || test_number == 3) {
G_LOCK (thread_counter_sort);
quit &= sort_thread_counter <= 0;
- debug("***** POOL SORT THREAD COUNT:%ld\n",
- sort_thread_counter);
+ DEBUG_MSG (("***** POOL SORT THREAD COUNT:%ld",
+ sort_thread_counter));
G_UNLOCK (thread_counter_sort);
}
if (test_number == 4) {
guint idle;
- idle = g_thread_pool_get_num_threads (idle_pool);
+ idle = g_thread_pool_get_num_unused_threads ();
quit &= idle < 1;
- debug("***** POOL IDLE THREAD COUNT:%d, UNPROCESSED JOBS:%d\n",
- idle, g_thread_pool_unprocessed (idle_pool));
+ DEBUG_MSG (("***** POOL IDLE THREAD COUNT:%d, UNPROCESSED JOBS:%d",
+ idle, g_thread_pool_unprocessed (idle_pool)));
}
if (quit) {
#if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE)
g_thread_init (NULL);
- debug("Starting... (in one second)\n");
+ DEBUG_MSG (("Starting... (in one second)"));
g_timeout_add (1000, test_check_start_and_stop, NULL);
main_loop = g_main_loop_new (NULL, FALSE);