gboolean thread_cancelled;
gboolean synchronous;
gboolean thread_complete;
+ gboolean blocking_other_task;
GError *error;
union {
g_task_thread_pool_init ();)
static GThreadPool *task_pool;
+static GMutex task_pool_mutex;
+static GPrivate task_private = G_PRIVATE_INIT (NULL);
static void
g_task_init (GTask *task)
}
task->thread_complete = TRUE;
+
+ if (task->blocking_other_task)
+ {
+ g_mutex_lock (&task_pool_mutex);
+ g_thread_pool_set_max_threads (task_pool,
+ g_thread_pool_get_max_threads (task_pool) - 1,
+ NULL);
+ g_mutex_unlock (&task_pool_mutex);
+ }
g_mutex_unlock (&task->lock);
if (task->cancellable)
{
GTask *task = thread_data;
+ g_private_set (&task_private, task);
+
task->task_func (task, task->source_object, task->task_data,
task->cancellable);
g_task_thread_complete (task);
+
+ g_private_set (&task_private, NULL);
g_object_unref (task);
}
g_thread_pool_push (task_pool, g_object_ref (task), &task->error);
if (task->error)
task->thread_complete = TRUE;
+ else if (g_private_get (&task_private))
+ {
+ /* This thread is being spawned from another GTask thread, so
+ * bump up max-threads so we don't starve.
+ */
+ g_mutex_lock (&task_pool_mutex);
+ if (g_thread_pool_set_max_threads (task_pool,
+ g_thread_pool_get_max_threads (task_pool) + 1,
+ NULL))
+ task->blocking_other_task = TRUE;
+ g_mutex_unlock (&task_pool_mutex);
+ }
}
/**
const GTask *tb = b;
gboolean a_cancelled, b_cancelled;
+ /* Tasks that are causing other tasks to block have higher
+ * priority.
+ */
+ if (ta->blocking_other_task && !tb->blocking_other_task)
+ return -1;
+ else if (tb->blocking_other_task && !ta->blocking_other_task)
+ return 1;
+
+ /* Let already-cancelled tasks finish right away */
a_cancelled = (ta->check_cancellable &&
g_cancellable_is_cancelled (ta->cancellable));
b_cancelled = (tb->check_cancellable &&
g_cancellable_is_cancelled (tb->cancellable));
-
- /* Let already-cancelled tasks finish right away */
if (a_cancelled && !b_cancelled)
return -1;
else if (b_cancelled && !a_cancelled)
g_task_thread_pool_init (void)
{
task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL,
- 100, FALSE, NULL);
+ 10, FALSE, NULL);
g_assert (task_pool != NULL);
g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
g_task_return_boolean (task, TRUE);
}
-#define G_TASK_THREAD_POOL_SIZE 100
+#define G_TASK_THREAD_POOL_SIZE 10
+static int fake_tasks_running;
static void
-test_run_in_thread_priority (void)
+fake_task_callback (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ if (--fake_tasks_running == 0)
+ g_main_loop_quit (loop);
+}
+
+static void
+clog_up_thread_pool (void)
{
GTask *task;
- GCancellable *cancellable;
- int seq_a, seq_b, seq_c, seq_d;
int i;
- /* Flush the thread pool, then clog it up with junk tasks */
g_thread_pool_stop_unused_threads ();
g_mutex_lock (&fake_task_mutex);
for (i = 0; i < G_TASK_THREAD_POOL_SIZE - 1; i++)
{
- task = g_task_new (NULL, NULL, NULL, NULL);
+ task = g_task_new (NULL, NULL, fake_task_callback, NULL);
g_task_set_task_data (task, &fake_task_mutex, NULL);
g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_DEFAULT);
g_task_set_priority (task, G_PRIORITY_HIGH * 2);
g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_HIGH * 2);
g_task_run_in_thread (task, fake_task_thread);
g_object_unref (task);
+ fake_tasks_running++;
}
g_mutex_lock (&last_fake_task_mutex);
g_task_set_priority (task, G_PRIORITY_HIGH * 2);
g_task_run_in_thread (task, fake_task_thread);
g_object_unref (task);
+}
+
+static void
+unclog_thread_pool (void)
+{
+ g_mutex_unlock (&fake_task_mutex);
+ g_main_loop_run (loop);
+}
+
+static void
+test_run_in_thread_priority (void)
+{
+ GTask *task;
+ GCancellable *cancellable;
+ int seq_a, seq_b, seq_c, seq_d;
+
+ clog_up_thread_pool ();
/* Queue three more tasks that we'll arrange to have run serially */
task = g_task_new (NULL, NULL, NULL, NULL);
g_assert_cmpint (seq_a, ==, 3);
g_assert_cmpint (seq_b, ==, 4);
- g_mutex_unlock (&fake_task_mutex);
+ unclog_thread_pool ();
+}
+
+/* test_run_in_thread_nested: task threads that block waiting on
+ * other task threads will not cause the thread pool to starve.
+ */
+
+static void
+run_nested_task_thread (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ GTask *nested;
+ int *nested_tasks_left = task_data;
+
+ if ((*nested_tasks_left)--)
+ {
+ nested = g_task_new (NULL, NULL, NULL, NULL);
+ g_task_set_task_data (nested, nested_tasks_left, NULL);
+ g_task_run_in_thread_sync (nested, run_nested_task_thread);
+ g_object_unref (nested);
+ }
+
+ g_task_return_boolean (task, TRUE);
+}
+
+static void
+test_run_in_thread_nested (void)
+{
+ GTask *task;
+ int nested_tasks_left = 2;
+
+ clog_up_thread_pool ();
+
+ task = g_task_new (NULL, NULL, quit_main_loop_callback, NULL);
+ g_task_set_task_data (task, &nested_tasks_left, NULL);
+ g_task_run_in_thread (task, run_nested_task_thread);
+ g_object_unref (task);
+
+ g_mutex_unlock (&last_fake_task_mutex);
+ g_main_loop_run (loop);
+
+ unclog_thread_pool ();
}
/* test_return_on_cancel */
g_test_add_func ("/gtask/run-in-thread", test_run_in_thread);
g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync);
g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
+ g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);