- gpointer task = NULL;
- gboolean goto_global_pool = !pool->pool.exclusive;
- gint len = g_async_queue_length_unlocked (pool->queue);
-
- if (g_thread_should_run (pool, len))
- {
- if (watcher)
- {
- /* This thread is actually not needed here, but it waits
- * for some time anyway. If during that time a new
- * request arrives, this saves process
- * swicthes. Otherwise the thread will go to the global
- * pool afterwards */
- GTimeVal end_time;
- g_get_current_time (&end_time);
- g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */
- DEBUG_MSG(("pool:0x%.8x waiting 1/2 second to pop next item "
- "in queue (%d running, %d unprocessed) ...\n",
- (guint)pool,
- pool->num_threads,
- g_async_queue_length_unlocked (pool->queue)));
- task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
- }
- else if (g_thread_pool_get_max_idle_time() > 0)
- {
- /* We always give a maximum time to pop the next task so
- * we know that when we evaluate task further down, that
- * it has had the maximum time to get a new task and it
- * can die */
- GTimeVal end_time;
- g_get_current_time (&end_time);
- DEBUG_MSG(("pool:0x%.8x waiting %d ms max to pop next item in "
- "queue (%d running, %d unprocessed) or exiting ...\n",
- (guint)pool,
- g_thread_pool_get_max_idle_time (),
- pool->num_threads,
- g_async_queue_length_unlocked (pool->queue)));
-
- g_time_val_add (&end_time, g_thread_pool_get_max_idle_time () * 1000);
- task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
- }
- else
- {
- task = g_async_queue_pop_unlocked (pool->queue);
- DEBUG_MSG(("pool:0x%.8x new task:0x%.8x poped from pool queue ...\n",
- (guint)pool, (guint)task));
- }
-
- if (task)
- {
- watcher = FALSE;
- if (pool->num_threads > pool->max_threads &&
- pool->max_threads != -1)
- /* We are in fact a superfluous threads, so we go to
- * the global pool and just hand the data further to
- * the next one waiting in the queue */
- {
- DEBUG_MSG(("pool:0x%.8x, task:0x%.8x we have too many threads "
- "and max is set, pushing task into queue ...\n",
- (guint)pool, (guint)task));
- g_thread_pool_queue_push_unlocked (pool, task);
- goto_global_pool = TRUE;
- }
- else if (pool->running || !pool->immediate)
- {
- g_async_queue_unlock (pool->queue);
- DEBUG_MSG(("pool:0x%.8x, task:0x%.8x calling func ...\n",
- (guint)pool, (guint)task));
- pool->pool.func (task, pool->pool.user_data);
- g_async_queue_lock (pool->queue);
- }
- }
- else if (g_thread_pool_get_max_idle_time() > 0)
- {
- G_LOCK (settings);
- if (pool->num_threads > max_unused_threads) {
- G_UNLOCK (settings);
- pool->num_threads--;
-
- DEBUG_MSG(("pool:0x%.8x queue timed pop has no tasks waiting, "
- "so stopping thread (%d running, %d unprocessed) ...\n",
- (guint)pool,
- pool->num_threads,
- g_async_queue_length_unlocked (pool->queue)));
- g_async_queue_unlock (pool->queue);
-
- return NULL;
- }
- G_UNLOCK (settings);
- }
- len = g_async_queue_length_unlocked (pool->queue);
- }
-
- DEBUG_MSG(("pool:0x%.8x, len:%d, watcher:%s, exclusive:%s, should run:%s\n",
- (guint)pool,
- len,
- watcher ? "true" : "false",
- pool->pool.exclusive ? "true" : "false",
- g_thread_should_run (pool, len) ? "true" : "false"));
-
- if (!g_thread_should_run (pool, len))
- {
- g_cond_broadcast (inform_cond);
- goto_global_pool = TRUE;
- }
- else if (len > 0)
- {
- /* At this pool there are no threads waiting, but tasks are. */
- goto_global_pool = FALSE;
- }
- else if (len < 1 && g_thread_pool_get_max_idle_time () > 0)
- {
- goto_global_pool = FALSE;
- watcher = FALSE;
- }
- else if (len == 0 && !watcher && !pool->pool.exclusive)
- {
- /* Here neither threads nor tasks are queued and we didn't
- * just return from a timed wait. We now wait for a limited
- * time at this pool for new tasks to avoid costly context
- * switches. */
- goto_global_pool = FALSE;
- watcher = TRUE;
- }
-
- if (goto_global_pool)
- {
- DEBUG_MSG(("pool:0x%.8x, now in the global pool\n", (guint)pool));
- pool->num_threads--;
-
- if (!pool->running && !pool->waiting)
- {
- if (pool->num_threads == 0)
- {
- g_async_queue_unlock (pool->queue);
- g_thread_pool_free_internal (pool);
- }
- else
- {
- if (len == - pool->num_threads)
- g_thread_pool_wakeup_and_stop_all (pool);
-
- g_async_queue_unlock (pool->queue);
- }
- }
- else
- g_async_queue_unlock (pool->queue);
-
- g_async_queue_lock (unused_thread_queue);
-
- G_LOCK (settings);
- if ((unused_threads >= max_unused_threads &&
- max_unused_threads != -1))
- {
- G_UNLOCK (settings);
- g_async_queue_unlock (unused_thread_queue);
- DEBUG_MSG(("pool:0x%.8x stopping thread (%d running, %d unprocessed) ...\n",
- (guint)pool,
- pool->num_threads,
- g_async_queue_length_unlocked (pool->queue)));
- /* Stop this thread */
- return NULL;
- }
- unused_threads++;
- G_UNLOCK (settings);
-
- pool = g_async_queue_pop_unlocked (unused_thread_queue);
-
- G_LOCK (settings);
- unused_threads--;
- G_UNLOCK (settings);
-
- g_async_queue_unlock (unused_thread_queue);
-
- if (pool == stop_this_thread_marker)
- /* Stop this thread */
- return NULL;
-
- g_async_queue_lock (pool->queue);
-
- /* pool->num_threads++ is not done here, but in
- * g_thread_pool_start_thread to make the new started thread
- * known to the pool, before itself can do it. */
- }
+ gpointer task;
+
+ task = g_thread_pool_wait_for_new_task (pool);
+ if (task)
+ {
+ if (pool->running || !pool->immediate)
+ {
+ /* A task was received and the thread pool is active,
+ * so execute the function.
+ */
+ g_async_queue_unlock (pool->queue);
+ DEBUG_MSG (("thread %p in pool %p calling func.",
+ g_thread_self (), pool));
+ pool->pool.func (task, pool->pool.user_data);
+ g_async_queue_lock (pool->queue);
+ }
+ }
+ else
+ {
+ /* No task was received, so this thread goes to the global pool. */
+ gboolean free_pool = FALSE;
+
+ DEBUG_MSG (("thread %p leaving pool %p for global pool.",
+ g_thread_self (), pool));
+ pool->num_threads--;
+
+ if (!pool->running)
+ {
+ if (!pool->waiting)
+ {
+ if (pool->num_threads == 0)
+ {
+ /* If the pool is not running and no other
+ * thread is waiting for this thread pool to
+ * finish and this is the last thread of this
+ * pool, free the pool.
+ */
+ free_pool = TRUE;
+ }
+ else
+ {
+ /* If the pool is not running and no other
+ * thread is waiting for this thread pool to
+ * finish and this is not the last thread of
+ * this pool and there are no tasks left in the
+ * queue, wakeup the remaining threads.
+ */
+ if (g_async_queue_length_unlocked (pool->queue) ==
+ - pool->num_threads)
+ g_thread_pool_wakeup_and_stop_all (pool);
+ }
+ }
+ else if (pool->immediate ||
+ g_async_queue_length_unlocked (pool->queue) <= 0)
+ {
+ /* If the pool is not running and another thread is
+ * waiting for this thread pool to finish and there
+ * are either no tasks left or the pool shall stop
+ * immediately, inform the waiting thread of a change
+ * of the thread pool state.
+ */
+ g_cond_broadcast (&pool->cond);
+ }
+ }
+
+ g_async_queue_unlock (pool->queue);
+
+ if (free_pool)
+ g_thread_pool_free_internal (pool);
+
+ if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL)
+ break;
+
+ g_async_queue_lock (pool->queue);
+
+ DEBUG_MSG (("thread %p entering pool %p from global pool.",
+ g_thread_self (), pool));
+
+ /* pool->num_threads++ is not done here, but in
+ * g_thread_pool_start_thread to make the new started
+ * thread known to the pool before itself can do it.
+ */
+ }