#include "gasyncresult.h"
#include "gcancellable.h"
+#include "glib-private.h"
#include "glibintl.h"
gboolean thread_cancelled;
gboolean synchronous;
gboolean thread_complete;
- gboolean blocking_other_task;
GError *error;
union {
static GThreadPool *task_pool;
static GMutex task_pool_mutex;
-static GPrivate task_private = G_PRIVATE_INIT (NULL);
+static GSource *task_pool_manager;
+static guint64 task_wait_time;
+static gint tasks_running;
+
+/* When the task pool fills up and blocks, and the program keeps
+ * queueing more tasks, we will slowly add more threads to the pool
+ * (in case the existing tasks are trying to queue subtasks of their
+ * own) until tasks start completing again. These "overflow" threads
+ * will only run one task apiece, and then exit, so the pool will
+ * eventually get back down to its base size.
+ *
+ * The base and multiplier below gives us 10 extra threads after about
+ * a second of blocking, 30 after 5 seconds, 100 after a minute, and
+ * 200 after 20 minutes.
+ */
+#define G_TASK_POOL_SIZE 10
+#define G_TASK_WAIT_TIME_BASE 100000
+#define G_TASK_WAIT_TIME_MULTIPLIER 1.03
+#define G_TASK_WAIT_TIME_MAX (30 * 60 * 1000000)
static void
g_task_init (GTask *task)
}
task->thread_complete = TRUE;
-
- if (task->blocking_other_task)
- {
- g_mutex_lock (&task_pool_mutex);
- g_thread_pool_set_max_threads (task_pool,
- g_thread_pool_get_max_threads (task_pool) - 1,
- NULL);
- g_mutex_unlock (&task_pool_mutex);
- }
g_mutex_unlock (&task->lock);
if (task->cancellable)
g_task_return (task, G_TASK_RETURN_FROM_THREAD);
}
+static gboolean
+task_pool_manager_timeout (gpointer user_data)
+{
+ g_mutex_lock (&task_pool_mutex);
+ g_thread_pool_set_max_threads (task_pool, tasks_running + 1, NULL);
+ g_source_set_ready_time (task_pool_manager, -1);
+ g_mutex_unlock (&task_pool_mutex);
+
+ return TRUE;
+}
+
+static void
+g_task_thread_setup (void)
+{
+ g_mutex_lock (&task_pool_mutex);
+ tasks_running++;
+
+ if (tasks_running == G_TASK_POOL_SIZE)
+ task_wait_time = G_TASK_WAIT_TIME_BASE;
+ else if (tasks_running > G_TASK_POOL_SIZE && task_wait_time < G_TASK_WAIT_TIME_MAX)
+ task_wait_time *= G_TASK_WAIT_TIME_MULTIPLIER;
+
+ if (tasks_running >= G_TASK_POOL_SIZE)
+ g_source_set_ready_time (task_pool_manager, g_get_monotonic_time () + task_wait_time);
+
+ g_mutex_unlock (&task_pool_mutex);
+}
+
+static void
+g_task_thread_cleanup (void)
+{
+ gint tasks_pending;
+
+ g_mutex_lock (&task_pool_mutex);
+ tasks_pending = g_thread_pool_unprocessed (task_pool);
+
+ if (tasks_running > G_TASK_POOL_SIZE)
+ g_thread_pool_set_max_threads (task_pool, tasks_running - 1, NULL);
+ else if (tasks_running + tasks_pending < G_TASK_POOL_SIZE)
+ g_source_set_ready_time (task_pool_manager, -1);
+
+ tasks_running--;
+ g_mutex_unlock (&task_pool_mutex);
+}
+
static void
g_task_thread_pool_thread (gpointer thread_data,
gpointer pool_data)
{
GTask *task = thread_data;
- g_private_set (&task_private, task);
+ g_task_thread_setup ();
task->task_func (task, task->source_object, task->task_data,
task->cancellable);
g_task_thread_complete (task);
-
- g_private_set (&task_private, NULL);
g_object_unref (task);
+
+ g_task_thread_cleanup ();
}
static void
}
g_thread_pool_push (task_pool, g_object_ref (task), NULL);
- if (g_private_get (&task_private))
- {
- /* This thread is being spawned from another GTask thread, so
- * bump up max-threads so we don't starve.
- */
- g_mutex_lock (&task_pool_mutex);
- if (g_thread_pool_set_max_threads (task_pool,
- g_thread_pool_get_max_threads (task_pool) + 1,
- NULL))
- task->blocking_other_task = TRUE;
- g_mutex_unlock (&task_pool_mutex);
- }
}
/**
*
* See #GTaskThreadFunc for more details about how @task_func is handled.
*
+ * Although GLib currently rate-limits the tasks queued via
+ * g_task_run_in_thread(), you should not assume that it will always
+ * do this. If you have a very large number of tasks to run, but don't
+ * want them to all run at once, you should only queue a limited
+ * number of them at a time.
+ *
* Since: 2.36
*/
void
* have a callback, it will not be invoked when @task_func returns.
* #GTask:completed will be set to %TRUE just before this function returns.
*
+ * Although GLib currently rate-limits the tasks queued via
+ * g_task_run_in_thread_sync(), you should not assume that it will
+ * always do this. If you have a very large number of tasks to run,
+ * but don't want them to all run at once, you should only queue a
+ * limited number of them at a time.
+ *
* Since: 2.36
*/
void
const GTask *tb = b;
gboolean a_cancelled, b_cancelled;
- /* Tasks that are causing other tasks to block have higher
- * priority.
- */
- if (ta->blocking_other_task && !tb->blocking_other_task)
- return -1;
- else if (tb->blocking_other_task && !ta->blocking_other_task)
- return 1;
-
/* Let already-cancelled tasks finish right away */
a_cancelled = (ta->check_cancellable &&
g_cancellable_is_cancelled (ta->cancellable));
return ta->priority - tb->priority;
}
+static gboolean
+trivial_source_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ return callback (user_data);
+}
+
+GSourceFuncs trivial_source_funcs = {
+ NULL, /* prepare */
+ NULL, /* check */
+ trivial_source_dispatch,
+ NULL
+};
+
static void
g_task_thread_pool_init (void)
{
task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL,
- 10, FALSE, NULL);
+ G_TASK_POOL_SIZE, FALSE, NULL);
g_assert (task_pool != NULL);
g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
+
+ task_pool_manager = g_source_new (&trivial_source_funcs, sizeof (GSource));
+ g_source_set_callback (task_pool_manager, task_pool_manager_timeout, NULL, NULL);
+ g_source_set_ready_time (task_pool_manager, -1);
+ g_source_attach (task_pool_manager,
+ GLIB_PRIVATE_CALL (g_get_worker_context ()));
+ g_source_unref (task_pool_manager);
}
static void
*/
#include <gio/gio.h>
+#include <string.h>
static GMainLoop *loop;
static GThread *main_thread;
unclog_thread_pool ();
}
+/* test_run_in_thread_overflow: if you queue lots and lots and lots of
+ * tasks, they won't all run at once.
+ */
+static GMutex overflow_mutex;
+
+static void
+run_overflow_task_thread (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ gchar *result = task_data;
+
+ if (g_task_return_error_if_cancelled (task))
+ {
+ *result = 'X';
+ return;
+ }
+
+ /* Block until the main thread is ready. */
+ g_mutex_lock (&overflow_mutex);
+ g_mutex_unlock (&overflow_mutex);
+
+ *result = '.';
+
+ g_task_return_boolean (task, TRUE);
+}
+
+#define NUM_OVERFLOW_TASKS 1024
+
+static void
+test_run_in_thread_overflow (void)
+{
+ GCancellable *cancellable;
+ GTask *task;
+ gchar buf[NUM_OVERFLOW_TASKS + 1];
+ gint i;
+
+ /* Queue way too many tasks and then sleep for a bit. The first 10
+ * tasks will be dispatched to threads and will then block on
+ * overflow_mutex, so more threads will be created while this thread
+ * is sleeping. Then we cancel the cancellable, unlock the mutex,
+ * wait for all of the tasks to complete, and make sure that we got
+ * the behavior we expected.
+ */
+
+ memset (buf, 0, sizeof (buf));
+ cancellable = g_cancellable_new ();
+
+ g_mutex_lock (&overflow_mutex);
+
+ for (i = 0; i < NUM_OVERFLOW_TASKS; i++)
+ {
+ task = g_task_new (NULL, cancellable, NULL, NULL);
+ g_task_set_task_data (task, buf + i, NULL);
+ g_task_run_in_thread (task, run_overflow_task_thread);
+ g_object_unref (task);
+ }
+
+ if (g_test_slow ())
+ g_usleep (5000000); /* 5 s */
+ else
+ g_usleep (500000); /* 0.5 s */
+ g_cancellable_cancel (cancellable);
+ g_object_unref (cancellable);
+
+ g_mutex_unlock (&overflow_mutex);
+
+ /* Wait for all tasks to complete. */
+ while (!buf[NUM_OVERFLOW_TASKS - 1])
+ g_usleep (1000);
+
+ i = strspn (buf, ".");
+ /* Given the sleep times above, i should be 14 for normal, 40 for
+ * slow. But if the machine is too slow/busy then the scheduling
+ * might get messed up and we'll get more or fewer threads than
+ * expected. But there are limits to how messed up it could
+ * plausibly get (and we hope that if gtask is actually broken then
+ * it will exceed those limits).
+ */
+ g_assert_cmpint (i, >=, 10);
+ if (g_test_slow ())
+ g_assert_cmpint (i, <, 50);
+ else
+ g_assert_cmpint (i, <, 20);
+
+ g_assert_cmpint (i + strspn (buf + i, "X"), ==, NUM_OVERFLOW_TASKS);
+}
+
/* test_return_on_cancel */
GMutex roc_init_mutex, roc_finish_mutex;
g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync);
g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
+ g_test_add_func ("/gtask/run-in-thread-overflow", test_run_in_thread_overflow);
g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);