gtask: don't deadlock when tasks block on other tasks
authorDan Winship <danw@gnome.org>
Sat, 15 Dec 2012 16:44:59 +0000 (11:44 -0500)
committerDan Winship <danw@gnome.org>
Tue, 18 Dec 2012 18:19:08 +0000 (13:19 -0500)
If tasks block waiting for other tasks to complete then the system can
end up starved for threads. Avoid this by bumping up max-threads in
that case.

This also reverts 7b1f8c58 and reverts max-threads for GTask's
GThreadPool back to 10.

https://bugzilla.gnome.org/show_bug.cgi?id=687223

gio/gtask.c
gio/tests/task.c

index 0a80c24..bdef1f4 100644 (file)
@@ -584,6 +584,7 @@ struct _GTask {
   gboolean thread_cancelled;
   gboolean synchronous;
   gboolean thread_complete;
+  gboolean blocking_other_task;
 
   GError *error;
   union {
@@ -613,6 +614,8 @@ G_DEFINE_TYPE_WITH_CODE (GTask, g_task, G_TYPE_OBJECT,
                          g_task_thread_pool_init ();)
 
 static GThreadPool *task_pool;
+static GMutex task_pool_mutex;
+static GPrivate task_private = G_PRIVATE_INIT (NULL);
 
 static void
 g_task_init (GTask *task)
@@ -1208,6 +1211,15 @@ g_task_thread_complete (GTask *task)
     }
 
   task->thread_complete = TRUE;
+
+  if (task->blocking_other_task)
+    {
+      g_mutex_lock (&task_pool_mutex);
+      g_thread_pool_set_max_threads (task_pool,
+                                     g_thread_pool_get_max_threads (task_pool) - 1,
+                                     NULL);
+      g_mutex_unlock (&task_pool_mutex);
+    }
   g_mutex_unlock (&task->lock);
 
   if (task->cancellable)
@@ -1225,9 +1237,13 @@ g_task_thread_pool_thread (gpointer thread_data,
 {
   GTask *task = thread_data;
 
+  g_private_set (&task_private, task);
+
   task->task_func (task, task->source_object, task->task_data,
                    task->cancellable);
   g_task_thread_complete (task);
+
+  g_private_set (&task_private, NULL);
   g_object_unref (task);
 }
 
@@ -1294,6 +1310,18 @@ g_task_start_task_thread (GTask           *task,
   g_thread_pool_push (task_pool, g_object_ref (task), &task->error);
   if (task->error)
     task->thread_complete = TRUE;
+  else if (g_private_get (&task_private))
+    {
+      /* This thread is being spawned from another GTask thread, so
+       * bump up max-threads so we don't starve.
+       */
+      g_mutex_lock (&task_pool_mutex);
+      if (g_thread_pool_set_max_threads (task_pool,
+                                         g_thread_pool_get_max_threads (task_pool) + 1,
+                                         NULL))
+        task->blocking_other_task = TRUE;
+      g_mutex_unlock (&task_pool_mutex);
+    }
 }
 
 /**
@@ -1747,12 +1775,19 @@ g_task_compare_priority (gconstpointer a,
   const GTask *tb = b;
   gboolean a_cancelled, b_cancelled;
 
+  /* Tasks that are causing other tasks to block have higher
+   * priority.
+   */
+  if (ta->blocking_other_task && !tb->blocking_other_task)
+    return -1;
+  else if (tb->blocking_other_task && !ta->blocking_other_task)
+    return 1;
+
+  /* Let already-cancelled tasks finish right away */
   a_cancelled = (ta->check_cancellable &&
                  g_cancellable_is_cancelled (ta->cancellable));
   b_cancelled = (tb->check_cancellable &&
                  g_cancellable_is_cancelled (tb->cancellable));
-
-  /* Let already-cancelled tasks finish right away */
   if (a_cancelled && !b_cancelled)
     return -1;
   else if (b_cancelled && !a_cancelled)
@@ -1766,7 +1801,7 @@ static void
 g_task_thread_pool_init (void)
 {
   task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL,
-                                 100, FALSE, NULL);
+                                 10, FALSE, NULL);
   g_assert (task_pool != NULL);
 
   g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
index 8065807..6dbff5c 100644 (file)
@@ -824,29 +824,37 @@ fake_task_thread (GTask        *task,
   g_task_return_boolean (task, TRUE);
 }
 
-#define G_TASK_THREAD_POOL_SIZE 100
+#define G_TASK_THREAD_POOL_SIZE 10
+static int fake_tasks_running;
 
 static void
-test_run_in_thread_priority (void)
+fake_task_callback (GObject      *source,
+                    GAsyncResult *result,
+                    gpointer      user_data)
+{
+  if (--fake_tasks_running == 0)
+    g_main_loop_quit (loop);
+}
+
+static void
+clog_up_thread_pool (void)
 {
   GTask *task;
-  GCancellable *cancellable;
-  int seq_a, seq_b, seq_c, seq_d;
   int i;
 
-  /* Flush the thread pool, then clog it up with junk tasks */
   g_thread_pool_stop_unused_threads ();
 
   g_mutex_lock (&fake_task_mutex);
   for (i = 0; i < G_TASK_THREAD_POOL_SIZE - 1; i++)
     {
-      task = g_task_new (NULL, NULL, NULL, NULL);
+      task = g_task_new (NULL, NULL, fake_task_callback, NULL);
       g_task_set_task_data (task, &fake_task_mutex, NULL);
       g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_DEFAULT);
       g_task_set_priority (task, G_PRIORITY_HIGH * 2);
       g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_HIGH * 2);
       g_task_run_in_thread (task, fake_task_thread);
       g_object_unref (task);
+      fake_tasks_running++;
     }
 
   g_mutex_lock (&last_fake_task_mutex);
@@ -855,6 +863,23 @@ test_run_in_thread_priority (void)
   g_task_set_priority (task, G_PRIORITY_HIGH * 2);
   g_task_run_in_thread (task, fake_task_thread);
   g_object_unref (task);
+}
+
+static void
+unclog_thread_pool (void)
+{
+  g_mutex_unlock (&fake_task_mutex);
+  g_main_loop_run (loop);
+}
+
+static void
+test_run_in_thread_priority (void)
+{
+  GTask *task;
+  GCancellable *cancellable;
+  int seq_a, seq_b, seq_c, seq_d;
+
+  clog_up_thread_pool ();
 
   /* Queue three more tasks that we'll arrange to have run serially */
   task = g_task_new (NULL, NULL, NULL, NULL);
@@ -894,7 +919,50 @@ test_run_in_thread_priority (void)
   g_assert_cmpint (seq_a, ==, 3);
   g_assert_cmpint (seq_b, ==, 4);
 
-  g_mutex_unlock (&fake_task_mutex);
+  unclog_thread_pool ();
+}
+
+/* test_run_in_thread_nested: task threads that block waiting on
+ * other task threads will not cause the thread pool to starve.
+ */
+
+static void
+run_nested_task_thread (GTask        *task,
+                        gpointer      source_object,
+                        gpointer      task_data,
+                        GCancellable *cancellable)
+{
+  GTask *nested;
+  int *nested_tasks_left = task_data;
+
+  if ((*nested_tasks_left)--)
+    {
+      nested = g_task_new (NULL, NULL, NULL, NULL);
+      g_task_set_task_data (nested, nested_tasks_left, NULL);
+      g_task_run_in_thread_sync (nested, run_nested_task_thread);
+      g_object_unref (nested);
+    }
+
+  g_task_return_boolean (task, TRUE);
+}
+
+static void
+test_run_in_thread_nested (void)
+{
+  GTask *task;
+  int nested_tasks_left = 2;
+
+  clog_up_thread_pool ();
+
+  task = g_task_new (NULL, NULL, quit_main_loop_callback, NULL);
+  g_task_set_task_data (task, &nested_tasks_left, NULL);
+  g_task_run_in_thread (task, run_nested_task_thread);
+  g_object_unref (task);
+
+  g_mutex_unlock (&last_fake_task_mutex);
+  g_main_loop_run (loop);
+
+  unclog_thread_pool ();
 }
 
 /* test_return_on_cancel */
@@ -1652,6 +1720,7 @@ main (int argc, char **argv)
   g_test_add_func ("/gtask/run-in-thread", test_run_in_thread);
   g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync);
   g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
+  g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
   g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
   g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
   g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);