- gpointer task;
- gboolean goto_global_pool = !pool->pool.exclusive;
- gint len = g_async_queue_length_unlocked (pool->queue);
-
- if (g_thread_should_run (pool, len))
- {
- if (watcher)
- {
- /* This thread is actually not needed here, but it waits
- * for some time anyway. If during that time a new
- * request arrives, this saves process
- * swicthes. Otherwise the thread will go to the global
- * pool afterwards */
- GTimeVal end_time;
- g_get_current_time (&end_time);
- g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */
- task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
- }
- else
- task = g_async_queue_pop_unlocked (pool->queue);
-
- if (task)
- {
- watcher = FALSE;
- if (pool->num_threads > pool->max_threads &&
- pool->max_threads != -1)
- /* We are in fact a superfluous threads, so we go to
- * the global pool and just hand the data further to
- * the next one waiting in the queue */
- {
- g_thread_pool_queue_push_unlocked (pool, task);
- goto_global_pool = TRUE;
- }
- else if (pool->running || !pool->immediate)
- {
- g_async_queue_unlock (pool->queue);
- pool->pool.func (task, pool->pool.user_data);
- g_async_queue_lock (pool->queue);
- }
- }
- len = g_async_queue_length_unlocked (pool->queue);
- }
-
- if (!g_thread_should_run (pool, len))
- {
- g_cond_broadcast (inform_cond);
- goto_global_pool = TRUE;
- }
- else if (len > 0)
- {
- /* At this pool there are no threads waiting, but tasks are. */
- goto_global_pool = FALSE;
- }
- else if (len == 0 && !watcher && !pool->pool.exclusive)
- {
- /* Here neither threads nor tasks are queued and we didn't
- * just return from a timed wait. We now wait for a limited
- * time at this pool for new tasks to avoid costly context
- * switches. */
- goto_global_pool = FALSE;
- watcher = TRUE;
- }
-
- if (goto_global_pool)
- {
- pool->num_threads--;
-
- if (!pool->running && !pool->waiting)
- {
- if (pool->num_threads == 0)
- {
- g_async_queue_unlock (pool->queue);
- g_thread_pool_free_internal (pool);
- }
- else
- {
- if (len == - pool->num_threads)
- g_thread_pool_wakeup_and_stop_all (pool);
-
- g_async_queue_unlock (pool->queue);
- }
- }
- else
- g_async_queue_unlock (pool->queue);
-
- g_async_queue_lock (unused_thread_queue);
-
- G_LOCK (unused_threads);
- if ((unused_threads >= max_unused_threads &&
- max_unused_threads != -1))
- {
- G_UNLOCK (unused_threads);
- g_async_queue_unlock (unused_thread_queue);
- /* Stop this thread */
- return NULL;
- }
- unused_threads++;
- G_UNLOCK (unused_threads);
-
- pool = g_async_queue_pop_unlocked (unused_thread_queue);
-
- G_LOCK (unused_threads);
- unused_threads--;
- G_UNLOCK (unused_threads);
-
- g_async_queue_unlock (unused_thread_queue);
-
- if (pool == stop_this_thread_marker)
- /* Stop this thread */
- return NULL;
-
- g_async_queue_lock (pool->queue);
-
- /* pool->num_threads++ is not done here, but in
- * g_thread_pool_start_thread to make the new started thread
- * known to the pool, before itself can do it. */
- }
+ gpointer task;
+
+ task = g_thread_pool_wait_for_new_task (pool);
+ if (task)
+ {
+ if (pool->running || !pool->immediate)
+ {
+ /* A task was received and the thread pool is active,
+ * so execute the function.
+ */
+ g_async_queue_unlock (pool->queue);
+ DEBUG_MSG (("thread %p in pool %p calling func.",
+ g_thread_self (), pool));
+ pool->pool.func (task, pool->pool.user_data);
+ g_async_queue_lock (pool->queue);
+ }
+ }
+ else
+ {
+ /* No task was received, so this thread goes to the global pool. */
+ gboolean free_pool = FALSE;
+
+ DEBUG_MSG (("thread %p leaving pool %p for global pool.",
+ g_thread_self (), pool));
+ pool->num_threads--;
+
+ if (!pool->running)
+ {
+ if (!pool->waiting)
+ {
+ if (pool->num_threads == 0)
+ {
+ /* If the pool is not running and no other
+ * thread is waiting for this thread pool to
+ * finish and this is the last thread of this
+ * pool, free the pool.
+ */
+ free_pool = TRUE;
+ }
+ else
+ {
+ /* If the pool is not running and no other
+ * thread is waiting for this thread pool to
+ * finish and this is not the last thread of
+ * this pool and there are no tasks left in the
+ * queue, wakeup the remaining threads.
+ */
+ if (g_async_queue_length_unlocked (pool->queue) ==
+ - pool->num_threads)
+ g_thread_pool_wakeup_and_stop_all (pool);
+ }
+ }
+ else if (pool->immediate ||
+ g_async_queue_length_unlocked (pool->queue) <= 0)
+ {
+ /* If the pool is not running and another thread is
+ * waiting for this thread pool to finish and there
+ * are either no tasks left or the pool shall stop
+ * immediately, inform the waiting thread of a change
+ * of the thread pool state.
+ */
+ g_cond_broadcast (&pool->cond);
+ }
+ }
+
+ g_async_queue_unlock (pool->queue);
+
+ if (free_pool)
+ g_thread_pool_free_internal (pool);
+
+ if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL)
+ break;
+
+ g_async_queue_lock (pool->queue);
+
+ DEBUG_MSG (("thread %p entering pool %p from global pool.",
+ g_thread_self (), pool));
+
+ /* pool->num_threads++ is not done here, but in
+ * g_thread_pool_start_thread to make the new started
+ * thread known to the pool before itself can do it.
+ */
+ }