Change LGPL-2.1+ to LGPL-2.1-or-later
[platform/upstream/glib.git] / glib / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GThreadPool: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * SPDX-License-Identifier: LGPL-2.1-or-later
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License as published by the Free Software Foundation; either
12  * version 2.1 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Lesser General Public License for more details.
18  *
19  * You should have received a copy of the GNU Lesser General Public
20  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "gthreadpool.h"
30
31 #include "gasyncqueue.h"
32 #include "gasyncqueueprivate.h"
33 #include "glib-private.h"
34 #include "gmain.h"
35 #include "gtestutils.h"
36 #include "gthreadprivate.h"
37 #include "gtimer.h"
38 #include "gutils.h"
39
40 /**
41  * SECTION:thread_pools
42  * @title: Thread Pools
43  * @short_description: pools of threads to execute work concurrently
44  * @see_also: #GThread
45  *
46  * Sometimes you wish to asynchronously fork out the execution of work
47  * and continue working in your own thread. If that will happen often,
48  * the overhead of starting and destroying a thread each time might be
49  * too high. In such cases reusing already started threads seems like a
50  * good idea. And it indeed is, but implementing this can be tedious
51  * and error-prone.
52  *
53  * Therefore GLib provides thread pools for your convenience. An added
54  * advantage is, that the threads can be shared between the different
55  * subsystems of your program, when they are using GLib.
56  *
57  * To create a new thread pool, you use g_thread_pool_new().
58  * It is destroyed by g_thread_pool_free().
59  *
60  * If you want to execute a certain task within a thread pool,
61  * you call g_thread_pool_push().
62  *
63  * To get the current number of running threads you call
64  * g_thread_pool_get_num_threads(). To get the number of still
65  * unprocessed tasks you call g_thread_pool_unprocessed(). To control
66  * the maximal number of threads for a thread pool, you use
67  * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
68  *
69  * Finally you can control the number of unused threads, that are kept
70  * alive by GLib for future use. The current number can be fetched with
71  * g_thread_pool_get_num_unused_threads(). The maximal number can be
72  * controlled by g_thread_pool_get_max_unused_threads() and
73  * g_thread_pool_set_max_unused_threads(). All currently unused threads
74  * can be stopped by calling g_thread_pool_stop_unused_threads().
75  */
76
77 #define DEBUG_MSG(x)
78 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
79
80 typedef struct _GRealThreadPool GRealThreadPool;
81
82 /**
83  * GThreadPool:
84  * @func: the function to execute in the threads of this pool
85  * @user_data: the user data for the threads of this pool
86  * @exclusive: are all threads exclusive to this pool
87  *
88  * The #GThreadPool struct represents a thread pool. It has three
89  * public read-only members, but the underlying struct is bigger,
90  * so you must not copy this struct.
91  */
92 struct _GRealThreadPool
93 {
94   GThreadPool pool;
95   GAsyncQueue *queue;
96   GCond cond;
97   gint max_threads;
98   guint num_threads;
99   gboolean running;
100   gboolean immediate;
101   gboolean waiting;
102   GCompareDataFunc sort_func;
103   gpointer sort_user_data;
104 };
105
106 /* The following is just an address to mark the wakeup order for a
107  * thread, it could be any address (as long, as it isn't a valid
108  * GThreadPool address)
109  */
110 static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
111 static gint wakeup_thread_serial = 0;
112
113 /* Here all unused threads are waiting  */
114 static GAsyncQueue *unused_thread_queue = NULL;
115 static gint unused_threads = 0;
116 static gint max_unused_threads = 2;
117 static gint kill_unused_threads = 0;
118 static guint max_idle_time = 15 * 1000;
119
120 typedef struct
121 {
122   /* Either thread or error are set in the end. Both transfer-full. */
123   GThreadPool *pool;
124   GThread *thread;
125   GError *error;
126 } SpawnThreadData;
127
128 static GCond spawn_thread_cond;
129 static GAsyncQueue *spawn_thread_queue;
130
131 static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
132                                                            gpointer          data);
133 static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
134 static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
135 static gboolean         g_thread_pool_start_thread        (GRealThreadPool  *pool,
136                                                            GError          **error);
137 static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
138 static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
139 static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
140
141 static void
142 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
143                                    gpointer         data)
144 {
145   if (pool->sort_func)
146     g_async_queue_push_sorted_unlocked (pool->queue,
147                                         data,
148                                         pool->sort_func,
149                                         pool->sort_user_data);
150   else
151     g_async_queue_push_unlocked (pool->queue, data);
152 }
153
154 static GRealThreadPool*
155 g_thread_pool_wait_for_new_pool (void)
156 {
157   GRealThreadPool *pool;
158   gint local_wakeup_thread_serial;
159   guint local_max_unused_threads;
160   gint local_max_idle_time;
161   gint last_wakeup_thread_serial;
162   gboolean have_relayed_thread_marker = FALSE;
163
164   local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
165   local_max_idle_time = g_atomic_int_get (&max_idle_time);
166   last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
167
168   do
169     {
170       if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
171         {
172           /* If this is a superfluous thread, stop it. */
173           pool = NULL;
174         }
175       else if (local_max_idle_time > 0)
176         {
177           /* If a maximal idle time is given, wait for the given time. */
178           DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
179                       g_thread_self (), local_max_idle_time / 1000.0));
180
181           pool = g_async_queue_timeout_pop (unused_thread_queue,
182                                             local_max_idle_time * 1000);
183         }
184       else
185         {
186           /* If no maximal idle time is given, wait indefinitely. */
187           DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
188           pool = g_async_queue_pop (unused_thread_queue);
189         }
190
191       if (pool == wakeup_thread_marker)
192         {
193           local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
194           if (last_wakeup_thread_serial == local_wakeup_thread_serial)
195             {
196               if (!have_relayed_thread_marker)
197               {
198                 /* If this wakeup marker has been received for
199                  * the second time, relay it.
200                  */
201                 DEBUG_MSG (("thread %p relaying wakeup message to "
202                             "waiting thread with lower serial.",
203                             g_thread_self ()));
204
205                 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
206                 have_relayed_thread_marker = TRUE;
207
208                 /* If a wakeup marker has been relayed, this thread
209                  * will get out of the way for 100 microseconds to
210                  * avoid receiving this marker again.
211                  */
212                 g_usleep (100);
213               }
214             }
215           else
216             {
217               if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
218                 {
219                   pool = NULL;
220                   break;
221                 }
222
223               DEBUG_MSG (("thread %p updating to new limits.",
224                           g_thread_self ()));
225
226               local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
227               local_max_idle_time = g_atomic_int_get (&max_idle_time);
228               last_wakeup_thread_serial = local_wakeup_thread_serial;
229
230               have_relayed_thread_marker = FALSE;
231             }
232         }
233     }
234   while (pool == wakeup_thread_marker);
235
236   return pool;
237 }
238
239 static gpointer
240 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
241 {
242   gpointer task = NULL;
243
244   if (pool->running || (!pool->immediate &&
245                         g_async_queue_length_unlocked (pool->queue) > 0))
246     {
247       /* This thread pool is still active. */
248       if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
249         {
250           /* This is a superfluous thread, so it goes to the global pool. */
251           DEBUG_MSG (("superfluous thread %p in pool %p.",
252                       g_thread_self (), pool));
253         }
254       else if (pool->pool.exclusive)
255         {
256           /* Exclusive threads stay attached to the pool. */
257           task = g_async_queue_pop_unlocked (pool->queue);
258
259           DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
260                       "(%d running, %d unprocessed).",
261                       g_thread_self (), pool, pool->num_threads,
262                       g_async_queue_length_unlocked (pool->queue)));
263         }
264       else
265         {
266           /* A thread will wait for new tasks for at most 1/2
267            * second before going to the global pool.
268            */
269           DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
270                       "(%d running, %d unprocessed).",
271                       g_thread_self (), pool, pool->num_threads,
272                       g_async_queue_length_unlocked (pool->queue)));
273
274           task = g_async_queue_timeout_pop_unlocked (pool->queue,
275                                                      G_USEC_PER_SEC / 2);
276         }
277     }
278   else
279     {
280       /* This thread pool is inactive, it will no longer process tasks. */
281       DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
282                   "(running: %s, immediate: %s, len: %d).",
283                   pool, g_thread_self (),
284                   pool->running ? "true" : "false",
285                   pool->immediate ? "true" : "false",
286                   g_async_queue_length_unlocked (pool->queue)));
287     }
288
289   return task;
290 }
291
292 static gpointer
293 g_thread_pool_spawn_thread (gpointer data)
294 {
295   while (TRUE)
296     {
297       SpawnThreadData *spawn_thread_data;
298       GThread *thread = NULL;
299       GError *error = NULL;
300       const gchar *prgname = g_get_prgname ();
301       gchar name[16] = "pool";
302
303       if (prgname)
304         g_snprintf (name, sizeof (name), "pool-%s", prgname);
305
306       g_async_queue_lock (spawn_thread_queue);
307       /* Spawn a new thread for the given pool and wake the requesting thread
308        * up again with the result. This new thread will have the scheduler
309        * settings inherited from this thread and in extension of the thread
310        * that created the first non-exclusive thread-pool. */
311       spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
312       thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
313
314       spawn_thread_data->thread = g_steal_pointer (&thread);
315       spawn_thread_data->error = g_steal_pointer (&error);
316
317       g_cond_broadcast (&spawn_thread_cond);
318       g_async_queue_unlock (spawn_thread_queue);
319     }
320
321   return NULL;
322 }
323
324 static gpointer
325 g_thread_pool_thread_proxy (gpointer data)
326 {
327   GRealThreadPool *pool;
328
329   pool = data;
330
331   DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
332
333   g_async_queue_lock (pool->queue);
334
335   while (TRUE)
336     {
337       gpointer task;
338
339       task = g_thread_pool_wait_for_new_task (pool);
340       if (task)
341         {
342           if (pool->running || !pool->immediate)
343             {
344               /* A task was received and the thread pool is active,
345                * so execute the function.
346                */
347               g_async_queue_unlock (pool->queue);
348               DEBUG_MSG (("thread %p in pool %p calling func.",
349                           g_thread_self (), pool));
350               pool->pool.func (task, pool->pool.user_data);
351               g_async_queue_lock (pool->queue);
352             }
353         }
354       else
355         {
356           /* No task was received, so this thread goes to the global pool. */
357           gboolean free_pool = FALSE;
358
359           DEBUG_MSG (("thread %p leaving pool %p for global pool.",
360                       g_thread_self (), pool));
361           pool->num_threads--;
362
363           if (!pool->running)
364             {
365               if (!pool->waiting)
366                 {
367                   if (pool->num_threads == 0)
368                     {
369                       /* If the pool is not running and no other
370                        * thread is waiting for this thread pool to
371                        * finish and this is the last thread of this
372                        * pool, free the pool.
373                        */
374                       free_pool = TRUE;
375                     }
376                   else
377                     {
378                       /* If the pool is not running and no other
379                        * thread is waiting for this thread pool to
380                        * finish and this is not the last thread of
381                        * this pool and there are no tasks left in the
382                        * queue, wakeup the remaining threads.
383                        */
384                       if (g_async_queue_length_unlocked (pool->queue) ==
385                           (gint) -pool->num_threads)
386                         g_thread_pool_wakeup_and_stop_all (pool);
387                     }
388                 }
389               else if (pool->immediate ||
390                        g_async_queue_length_unlocked (pool->queue) <= 0)
391                 {
392                   /* If the pool is not running and another thread is
393                    * waiting for this thread pool to finish and there
394                    * are either no tasks left or the pool shall stop
395                    * immediately, inform the waiting thread of a change
396                    * of the thread pool state.
397                    */
398                   g_cond_broadcast (&pool->cond);
399                 }
400             }
401
402           g_atomic_int_inc (&unused_threads);
403           g_async_queue_unlock (pool->queue);
404
405           if (free_pool)
406             g_thread_pool_free_internal (pool);
407
408           pool = g_thread_pool_wait_for_new_pool ();
409           g_atomic_int_add (&unused_threads, -1);
410
411           if (pool == NULL)
412             break;
413
414           g_async_queue_lock (pool->queue);
415
416           DEBUG_MSG (("thread %p entering pool %p from global pool.",
417                       g_thread_self (), pool));
418
419           /* pool->num_threads++ is not done here, but in
420            * g_thread_pool_start_thread to make the new started
421            * thread known to the pool before itself can do it.
422            */
423         }
424     }
425
426   return NULL;
427 }
428
429 static gboolean
430 g_thread_pool_start_thread (GRealThreadPool  *pool,
431                             GError          **error)
432 {
433   gboolean success = FALSE;
434
435   if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
436     /* Enough threads are already running */
437     return TRUE;
438
439   g_async_queue_lock (unused_thread_queue);
440
441   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
442     {
443       g_async_queue_push_unlocked (unused_thread_queue, pool);
444       success = TRUE;
445     }
446
447   g_async_queue_unlock (unused_thread_queue);
448
449   if (!success)
450     {
451       const gchar *prgname = g_get_prgname ();
452       gchar name[16] = "pool";
453       GThread *thread;
454
455       if (prgname)
456         g_snprintf (name, sizeof (name), "pool-%s", prgname);
457
458       /* No thread was found, we have to start a new one */
459       if (pool->pool.exclusive)
460         {
461           /* For exclusive thread-pools this is directly called from new() and
462            * we simply start new threads that inherit the scheduler settings
463            * from the current thread.
464            */
465           thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
466         }
467       else
468         {
469           /* For non-exclusive thread-pools this can be called at any time
470            * when a new thread is needed. We make sure to create a new thread
471            * here with the correct scheduler settings by going via our helper
472            * thread.
473            */
474           SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
475
476           g_async_queue_lock (spawn_thread_queue);
477
478           g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
479
480           while (!spawn_thread_data.thread && !spawn_thread_data.error)
481             g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
482
483           thread = spawn_thread_data.thread;
484           if (!thread)
485             g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
486           g_async_queue_unlock (spawn_thread_queue);
487         }
488
489       if (thread == NULL)
490         return FALSE;
491
492       g_thread_unref (thread);
493     }
494
495   /* See comment in g_thread_pool_thread_proxy as to why this is done
496    * here and not there
497    */
498   pool->num_threads++;
499
500   return TRUE;
501 }
502
503 /**
504  * g_thread_pool_new:
505  * @func: a function to execute in the threads of the new thread pool
506  * @user_data: user data that is handed over to @func every time it
507  *     is called
508  * @max_threads: the maximal number of threads to execute concurrently
509  *     in  the new thread pool, -1 means no limit
510  * @exclusive: should this thread pool be exclusive?
511  * @error: return location for error, or %NULL
512  *
513  * This function creates a new thread pool.
514  *
515  * Whenever you call g_thread_pool_push(), either a new thread is
516  * created or an unused one is reused. At most @max_threads threads
517  * are running concurrently for this thread pool. @max_threads = -1
518  * allows unlimited threads to be created for this thread pool. The
519  * newly created or reused thread now executes the function @func
520  * with the two arguments. The first one is the parameter to
521  * g_thread_pool_push() and the second one is @user_data.
522  *
523  * Pass g_get_num_processors() to @max_threads to create as many threads as
524  * there are logical processors on the system. This will not pin each thread to
525  * a specific processor.
526  *
527  * The parameter @exclusive determines whether the thread pool owns
528  * all threads exclusive or shares them with other thread pools.
529  * If @exclusive is %TRUE, @max_threads threads are started
530  * immediately and they will run exclusively for this thread pool
531  * until it is destroyed by g_thread_pool_free(). If @exclusive is
532  * %FALSE, threads are created when needed and shared between all
533  * non-exclusive thread pools. This implies that @max_threads may
534  * not be -1 for exclusive thread pools. Besides, exclusive thread
535  * pools are not affected by g_thread_pool_set_max_idle_time()
536  * since their threads are never considered idle and returned to the
537  * global pool.
538  *
539  * Note that the threads used by exclusive thread pools will all inherit the
540  * scheduler settings of the current thread while the threads used by
541  * non-exclusive thread pools will inherit the scheduler settings from the
542  * first thread that created such a thread pool.
543  *
544  * At least one thread will be spawned when this function is called, either to
545  * create the @max_threads exclusive threads, or to preserve the scheduler
546  * settings of the current thread for future spawns.
547  *
548  * @error can be %NULL to ignore errors, or non-%NULL to report
549  * errors. An error can only occur when @exclusive is set to %TRUE
550  * and not all @max_threads threads could be created.
551  * See #GThreadError for possible errors that may occur.
552  * Note, even in case of error a valid #GThreadPool is returned.
553  *
554  * Returns: the new #GThreadPool
555  */
556 GThreadPool *
557 g_thread_pool_new (GFunc      func,
558                    gpointer   user_data,
559                    gint       max_threads,
560                    gboolean   exclusive,
561                    GError   **error)
562 {
563   return g_thread_pool_new_full (func, user_data, NULL, max_threads, exclusive, error);
564 }
565
566 /**
567  * g_thread_pool_new_full:
568  * @func: a function to execute in the threads of the new thread pool
569  * @user_data: user data that is handed over to @func every time it
570  *     is called
571  * @item_free_func: (nullable): used to pass as a free function to
572  *     g_async_queue_new_full()
573  * @max_threads: the maximal number of threads to execute concurrently
574  *     in the new thread pool, `-1` means no limit
575  * @exclusive: should this thread pool be exclusive?
576  * @error: return location for error, or %NULL
577  *
578  * This function creates a new thread pool similar to g_thread_pool_new()
579  * but allowing @item_free_func to be specified to free the data passed
580  * to g_thread_pool_push() in the case that the #GThreadPool is stopped
581  * and freed before all tasks have been executed.
582  *
583  * @item_free_func will *not* be called on items successfully passed to @func.
584  * @func is responsible for freeing the items passed to it.
585  *
586  * Returns: (transfer full): the new #GThreadPool
587  *
588  * Since: 2.70
589  */
590 GThreadPool *
591 g_thread_pool_new_full (GFunc           func,
592                         gpointer        user_data,
593                         GDestroyNotify  item_free_func,
594                         gint            max_threads,
595                         gboolean        exclusive,
596                         GError        **error)
597 {
598   GRealThreadPool *retval;
599   G_LOCK_DEFINE_STATIC (init);
600
601   g_return_val_if_fail (func, NULL);
602   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
603   g_return_val_if_fail (max_threads >= -1, NULL);
604
605   retval = g_new (GRealThreadPool, 1);
606
607   retval->pool.func = func;
608   retval->pool.user_data = user_data;
609   retval->pool.exclusive = exclusive;
610   retval->queue = g_async_queue_new_full (item_free_func);
611   g_cond_init (&retval->cond);
612   retval->max_threads = max_threads;
613   retval->num_threads = 0;
614   retval->running = TRUE;
615   retval->immediate = FALSE;
616   retval->waiting = FALSE;
617   retval->sort_func = NULL;
618   retval->sort_user_data = NULL;
619
620   G_LOCK (init);
621   if (!unused_thread_queue)
622       unused_thread_queue = g_async_queue_new ();
623
624   /*
625    * Spawn a helper thread that is only responsible for spawning new threads
626    * with the scheduler settings of the current thread.
627    *
628    * This is then used for making sure that all threads created on the
629    * non-exclusive thread-pool have the same scheduler settings, and more
630    * importantly don't just inherit them from the thread that just happened to
631    * push a new task and caused a new thread to be created.
632    *
633    * Not doing so could cause real-time priority threads or otherwise
634    * threads with problematic scheduler settings to be part of the
635    * non-exclusive thread-pools.
636    *
637    * For exclusive thread-pools this is not required as all threads are
638    * created immediately below and are running forever, so they will
639    * automatically inherit the scheduler settings from this very thread.
640    */
641   if (!exclusive && !spawn_thread_queue)
642     {
643       GThread *pool_spawner = NULL;
644
645       spawn_thread_queue = g_async_queue_new ();
646       g_cond_init (&spawn_thread_cond);
647       pool_spawner = g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
648       g_ignore_leak (pool_spawner);
649     }
650   G_UNLOCK (init);
651
652   if (retval->pool.exclusive)
653     {
654       g_async_queue_lock (retval->queue);
655
656       while (retval->num_threads < (guint) retval->max_threads)
657         {
658           GError *local_error = NULL;
659
660           if (!g_thread_pool_start_thread (retval, &local_error))
661             {
662               g_propagate_error (error, local_error);
663               break;
664             }
665         }
666
667       g_async_queue_unlock (retval->queue);
668     }
669
670   return (GThreadPool*) retval;
671 }
672
673 /**
674  * g_thread_pool_push:
675  * @pool: a #GThreadPool
676  * @data: a new task for @pool
677  * @error: return location for error, or %NULL
678  *
679  * Inserts @data into the list of tasks to be executed by @pool.
680  *
681  * When the number of currently running threads is lower than the
682  * maximal allowed number of threads, a new thread is started (or
683  * reused) with the properties given to g_thread_pool_new().
684  * Otherwise, @data stays in the queue until a thread in this pool
685  * finishes its previous task and processes @data.
686  *
687  * @error can be %NULL to ignore errors, or non-%NULL to report
688  * errors. An error can only occur when a new thread couldn't be
689  * created. In that case @data is simply appended to the queue of
690  * work to do.
691  *
692  * Before version 2.32, this function did not return a success status.
693  *
694  * Returns: %TRUE on success, %FALSE if an error occurred
695  */
696 gboolean
697 g_thread_pool_push (GThreadPool  *pool,
698                     gpointer      data,
699                     GError      **error)
700 {
701   GRealThreadPool *real;
702   gboolean result;
703
704   real = (GRealThreadPool*) pool;
705
706   g_return_val_if_fail (real, FALSE);
707   g_return_val_if_fail (real->running, FALSE);
708
709   result = TRUE;
710
711   g_async_queue_lock (real->queue);
712
713   if (g_async_queue_length_unlocked (real->queue) >= 0)
714     {
715       /* No thread is waiting in the queue */
716       GError *local_error = NULL;
717
718       if (!g_thread_pool_start_thread (real, &local_error))
719         {
720           g_propagate_error (error, local_error);
721           result = FALSE;
722         }
723     }
724
725   g_thread_pool_queue_push_unlocked (real, data);
726   g_async_queue_unlock (real->queue);
727
728   return result;
729 }
730
731 /**
732  * g_thread_pool_set_max_threads:
733  * @pool: a #GThreadPool
734  * @max_threads: a new maximal number of threads for @pool,
735  *     or -1 for unlimited
736  * @error: return location for error, or %NULL
737  *
738  * Sets the maximal allowed number of threads for @pool.
739  * A value of -1 means that the maximal number of threads
740  * is unlimited. If @pool is an exclusive thread pool, setting
741  * the maximal number of threads to -1 is not allowed.
742  *
743  * Setting @max_threads to 0 means stopping all work for @pool.
744  * It is effectively frozen until @max_threads is set to a non-zero
745  * value again.
746  *
747  * A thread is never terminated while calling @func, as supplied by
748  * g_thread_pool_new(). Instead the maximal number of threads only
749  * has effect for the allocation of new threads in g_thread_pool_push().
750  * A new thread is allocated, whenever the number of currently
751  * running threads in @pool is smaller than the maximal number.
752  *
753  * @error can be %NULL to ignore errors, or non-%NULL to report
754  * errors. An error can only occur when a new thread couldn't be
755  * created.
756  *
757  * Before version 2.32, this function did not return a success status.
758  *
759  * Returns: %TRUE on success, %FALSE if an error occurred
760  */
761 gboolean
762 g_thread_pool_set_max_threads (GThreadPool  *pool,
763                                gint          max_threads,
764                                GError      **error)
765 {
766   GRealThreadPool *real;
767   gint to_start;
768   gboolean result;
769
770   real = (GRealThreadPool*) pool;
771
772   g_return_val_if_fail (real, FALSE);
773   g_return_val_if_fail (real->running, FALSE);
774   g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
775   g_return_val_if_fail (max_threads >= -1, FALSE);
776
777   result = TRUE;
778
779   g_async_queue_lock (real->queue);
780
781   real->max_threads = max_threads;
782
783   if (pool->exclusive)
784     to_start = real->max_threads - real->num_threads;
785   else
786     to_start = g_async_queue_length_unlocked (real->queue);
787
788   for ( ; to_start > 0; to_start--)
789     {
790       GError *local_error = NULL;
791
792       if (!g_thread_pool_start_thread (real, &local_error))
793         {
794           g_propagate_error (error, local_error);
795           result = FALSE;
796           break;
797         }
798     }
799
800   g_async_queue_unlock (real->queue);
801
802   return result;
803 }
804
805 /**
806  * g_thread_pool_get_max_threads:
807  * @pool: a #GThreadPool
808  *
809  * Returns the maximal number of threads for @pool.
810  *
811  * Returns: the maximal number of threads
812  */
813 gint
814 g_thread_pool_get_max_threads (GThreadPool *pool)
815 {
816   GRealThreadPool *real;
817   gint retval;
818
819   real = (GRealThreadPool*) pool;
820
821   g_return_val_if_fail (real, 0);
822   g_return_val_if_fail (real->running, 0);
823
824   g_async_queue_lock (real->queue);
825   retval = real->max_threads;
826   g_async_queue_unlock (real->queue);
827
828   return retval;
829 }
830
831 /**
832  * g_thread_pool_get_num_threads:
833  * @pool: a #GThreadPool
834  *
835  * Returns the number of threads currently running in @pool.
836  *
837  * Returns: the number of threads currently running
838  */
839 guint
840 g_thread_pool_get_num_threads (GThreadPool *pool)
841 {
842   GRealThreadPool *real;
843   guint retval;
844
845   real = (GRealThreadPool*) pool;
846
847   g_return_val_if_fail (real, 0);
848   g_return_val_if_fail (real->running, 0);
849
850   g_async_queue_lock (real->queue);
851   retval = real->num_threads;
852   g_async_queue_unlock (real->queue);
853
854   return retval;
855 }
856
857 /**
858  * g_thread_pool_unprocessed:
859  * @pool: a #GThreadPool
860  *
861  * Returns the number of tasks still unprocessed in @pool.
862  *
863  * Returns: the number of unprocessed tasks
864  */
865 guint
866 g_thread_pool_unprocessed (GThreadPool *pool)
867 {
868   GRealThreadPool *real;
869   gint unprocessed;
870
871   real = (GRealThreadPool*) pool;
872
873   g_return_val_if_fail (real, 0);
874   g_return_val_if_fail (real->running, 0);
875
876   unprocessed = g_async_queue_length (real->queue);
877
878   return MAX (unprocessed, 0);
879 }
880
881 /**
882  * g_thread_pool_free:
883  * @pool: a #GThreadPool
884  * @immediate: should @pool shut down immediately?
885  * @wait_: should the function wait for all tasks to be finished?
886  *
887  * Frees all resources allocated for @pool.
888  *
889  * If @immediate is %TRUE, no new task is processed for @pool.
890  * Otherwise @pool is not freed before the last task is processed.
891  * Note however, that no thread of this pool is interrupted while
892  * processing a task. Instead at least all still running threads
893  * can finish their tasks before the @pool is freed.
894  *
895  * If @wait_ is %TRUE, this function does not return before all
896  * tasks to be processed (dependent on @immediate, whether all
897  * or only the currently running) are ready.
898  * Otherwise this function returns immediately.
899  *
900  * After calling this function @pool must not be used anymore.
901  */
902 void
903 g_thread_pool_free (GThreadPool *pool,
904                     gboolean     immediate,
905                     gboolean     wait_)
906 {
907   GRealThreadPool *real;
908
909   real = (GRealThreadPool*) pool;
910
911   g_return_if_fail (real);
912   g_return_if_fail (real->running);
913
914   /* If there's no thread allowed here, there is not much sense in
915    * not stopping this pool immediately, when it's not empty
916    */
917   g_return_if_fail (immediate ||
918                     real->max_threads != 0 ||
919                     g_async_queue_length (real->queue) == 0);
920
921   g_async_queue_lock (real->queue);
922
923   real->running = FALSE;
924   real->immediate = immediate;
925   real->waiting = wait_;
926
927   if (wait_)
928     {
929       while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
930              !(immediate && real->num_threads == 0))
931         g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
932     }
933
934   if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
935     {
936       /* No thread is currently doing something (and nothing is left
937        * to process in the queue)
938        */
939       if (real->num_threads == 0)
940         {
941           /* No threads left, we clean up */
942           g_async_queue_unlock (real->queue);
943           g_thread_pool_free_internal (real);
944           return;
945         }
946
947       g_thread_pool_wakeup_and_stop_all (real);
948     }
949
950   /* The last thread should cleanup the pool */
951   real->waiting = FALSE;
952   g_async_queue_unlock (real->queue);
953 }
954
955 static void
956 g_thread_pool_free_internal (GRealThreadPool* pool)
957 {
958   g_return_if_fail (pool);
959   g_return_if_fail (pool->running == FALSE);
960   g_return_if_fail (pool->num_threads == 0);
961
962   /* Ensure the dummy item pushed on by g_thread_pool_wakeup_and_stop_all() is
963    * removed, before it’s potentially passed to the user-provided
964    * @item_free_func. */
965   g_async_queue_remove (pool->queue, GUINT_TO_POINTER (1));
966
967   g_async_queue_unref (pool->queue);
968   g_cond_clear (&pool->cond);
969
970   g_free (pool);
971 }
972
973 static void
974 g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
975 {
976   guint i;
977
978   g_return_if_fail (pool);
979   g_return_if_fail (pool->running == FALSE);
980   g_return_if_fail (pool->num_threads != 0);
981
982   pool->immediate = TRUE;
983
984   /*
985    * So here we're sending bogus data to the pool threads, which
986    * should cause them each to wake up, and check the above
987    * pool->immediate condition. However we don't want that
988    * data to be sorted (since it'll crash the sorter).
989    */
990   for (i = 0; i < pool->num_threads; i++)
991     g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
992 }
993
994 /**
995  * g_thread_pool_set_max_unused_threads:
996  * @max_threads: maximal number of unused threads
997  *
998  * Sets the maximal number of unused threads to @max_threads.
999  * If @max_threads is -1, no limit is imposed on the number
1000  * of unused threads.
1001  *
1002  * The default value is 2.
1003  */
1004 void
1005 g_thread_pool_set_max_unused_threads (gint max_threads)
1006 {
1007   g_return_if_fail (max_threads >= -1);
1008
1009   g_atomic_int_set (&max_unused_threads, max_threads);
1010
1011   if (max_threads != -1)
1012     {
1013       max_threads -= g_atomic_int_get (&unused_threads);
1014       if (max_threads < 0)
1015         {
1016           g_atomic_int_set (&kill_unused_threads, -max_threads);
1017           g_atomic_int_inc (&wakeup_thread_serial);
1018
1019           g_async_queue_lock (unused_thread_queue);
1020
1021           do
1022             {
1023               g_async_queue_push_unlocked (unused_thread_queue,
1024                                            wakeup_thread_marker);
1025             }
1026           while (++max_threads);
1027
1028           g_async_queue_unlock (unused_thread_queue);
1029         }
1030     }
1031 }
1032
1033 /**
1034  * g_thread_pool_get_max_unused_threads:
1035  *
1036  * Returns the maximal allowed number of unused threads.
1037  *
1038  * Returns: the maximal number of unused threads
1039  */
1040 gint
1041 g_thread_pool_get_max_unused_threads (void)
1042 {
1043   return g_atomic_int_get (&max_unused_threads);
1044 }
1045
1046 /**
1047  * g_thread_pool_get_num_unused_threads:
1048  *
1049  * Returns the number of currently unused threads.
1050  *
1051  * Returns: the number of currently unused threads
1052  */
1053 guint
1054 g_thread_pool_get_num_unused_threads (void)
1055 {
1056   return (guint) g_atomic_int_get (&unused_threads);
1057 }
1058
1059 /**
1060  * g_thread_pool_stop_unused_threads:
1061  *
1062  * Stops all currently unused threads. This does not change the
1063  * maximal number of unused threads. This function can be used to
1064  * regularly stop all unused threads e.g. from g_timeout_add().
1065  */
1066 void
1067 g_thread_pool_stop_unused_threads (void)
1068 {
1069   guint oldval;
1070
1071   oldval = g_thread_pool_get_max_unused_threads ();
1072
1073   g_thread_pool_set_max_unused_threads (0);
1074   g_thread_pool_set_max_unused_threads (oldval);
1075 }
1076
1077 /**
1078  * g_thread_pool_set_sort_function:
1079  * @pool: a #GThreadPool
1080  * @func: the #GCompareDataFunc used to sort the list of tasks.
1081  *     This function is passed two tasks. It should return
1082  *     0 if the order in which they are handled does not matter,
1083  *     a negative value if the first task should be processed before
1084  *     the second or a positive value if the second task should be
1085  *     processed first.
1086  * @user_data: user data passed to @func
1087  *
1088  * Sets the function used to sort the list of tasks. This allows the
1089  * tasks to be processed by a priority determined by @func, and not
1090  * just in the order in which they were added to the pool.
1091  *
1092  * Note, if the maximum number of threads is more than 1, the order
1093  * that threads are executed cannot be guaranteed 100%. Threads are
1094  * scheduled by the operating system and are executed at random. It
1095  * cannot be assumed that threads are executed in the order they are
1096  * created.
1097  *
1098  * Since: 2.10
1099  */
1100 void
1101 g_thread_pool_set_sort_function (GThreadPool      *pool,
1102                                  GCompareDataFunc  func,
1103                                  gpointer          user_data)
1104 {
1105   GRealThreadPool *real;
1106
1107   real = (GRealThreadPool*) pool;
1108
1109   g_return_if_fail (real);
1110   g_return_if_fail (real->running);
1111
1112   g_async_queue_lock (real->queue);
1113
1114   real->sort_func = func;
1115   real->sort_user_data = user_data;
1116
1117   if (func)
1118     g_async_queue_sort_unlocked (real->queue,
1119                                  real->sort_func,
1120                                  real->sort_user_data);
1121
1122   g_async_queue_unlock (real->queue);
1123 }
1124
1125 /**
1126  * g_thread_pool_move_to_front:
1127  * @pool: a #GThreadPool
1128  * @data: an unprocessed item in the pool
1129  *
1130  * Moves the item to the front of the queue of unprocessed
1131  * items, so that it will be processed next.
1132  *
1133  * Returns: %TRUE if the item was found and moved
1134  *
1135  * Since: 2.46
1136  */
1137 gboolean
1138 g_thread_pool_move_to_front (GThreadPool *pool,
1139                              gpointer     data)
1140 {
1141   GRealThreadPool *real = (GRealThreadPool*) pool;
1142   gboolean found;
1143
1144   g_async_queue_lock (real->queue);
1145
1146   found = g_async_queue_remove_unlocked (real->queue, data);
1147   if (found)
1148     g_async_queue_push_front_unlocked (real->queue, data);
1149
1150   g_async_queue_unlock (real->queue);
1151
1152   return found;
1153 }
1154
1155 /**
1156  * g_thread_pool_set_max_idle_time:
1157  * @interval: the maximum @interval (in milliseconds)
1158  *     a thread can be idle
1159  *
1160  * This function will set the maximum @interval that a thread
1161  * waiting in the pool for new tasks can be idle for before
1162  * being stopped. This function is similar to calling
1163  * g_thread_pool_stop_unused_threads() on a regular timeout,
1164  * except this is done on a per thread basis.
1165  *
1166  * By setting @interval to 0, idle threads will not be stopped.
1167  *
1168  * The default value is 15000 (15 seconds).
1169  *
1170  * Since: 2.10
1171  */
1172 void
1173 g_thread_pool_set_max_idle_time (guint interval)
1174 {
1175   guint i;
1176
1177   g_atomic_int_set (&max_idle_time, interval);
1178
1179   i = (guint) g_atomic_int_get (&unused_threads);
1180   if (i > 0)
1181     {
1182       g_atomic_int_inc (&wakeup_thread_serial);
1183       g_async_queue_lock (unused_thread_queue);
1184
1185       do
1186         {
1187           g_async_queue_push_unlocked (unused_thread_queue,
1188                                        wakeup_thread_marker);
1189         }
1190       while (--i);
1191
1192       g_async_queue_unlock (unused_thread_queue);
1193     }
1194 }
1195
1196 /**
1197  * g_thread_pool_get_max_idle_time:
1198  *
1199  * This function will return the maximum @interval that a
1200  * thread will wait in the thread pool for new tasks before
1201  * being stopped.
1202  *
1203  * If this function returns 0, threads waiting in the thread
1204  * pool for new work are not stopped.
1205  *
1206  * Returns: the maximum @interval (milliseconds) to wait
1207  *     for new tasks in the thread pool before stopping the
1208  *     thread
1209  *
1210  * Since: 2.10
1211  */
1212 guint
1213 g_thread_pool_get_max_idle_time (void)
1214 {
1215   return (guint) g_atomic_int_get (&max_idle_time);
1216 }