56356d2eb4af5e935a3e3b62994f15746b8fa5c8
[platform/upstream/glib.git] / glib / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "gthreadpool.h"
30
31 #include "gasyncqueue.h"
32 #include "gasyncqueueprivate.h"
33 #include "gmain.h"
34 #include "gtestutils.h"
35 #include "gtimer.h"
36
37 /**
38  * SECTION:thread_pools
39  * @title: Thread Pools
40  * @short_description: pools of threads to execute work concurrently
41  * @see_also: #GThread
42  *
43  * Sometimes you wish to asynchronously fork out the execution of work
44  * and continue working in your own thread. If that will happen often,
45  * the overhead of starting and destroying a thread each time might be
46  * too high. In such cases reusing already started threads seems like a
47  * good idea. And it indeed is, but implementing this can be tedious
48  * and error-prone.
49  *
50  * Therefore GLib provides thread pools for your convenience. An added
51  * advantage is, that the threads can be shared between the different
52  * subsystems of your program, when they are using GLib.
53  *
54  * To create a new thread pool, you use g_thread_pool_new().
55  * It is destroyed by g_thread_pool_free().
56  *
57  * If you want to execute a certain task within a thread pool,
58  * you call g_thread_pool_push().
59  *
60  * To get the current number of running threads you call
61  * g_thread_pool_get_num_threads(). To get the number of still
62  * unprocessed tasks you call g_thread_pool_unprocessed(). To control
63  * the maximal number of threads for a thread pool, you use
64  * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
65  *
66  * Finally you can control the number of unused threads, that are kept
67  * alive by GLib for future use. The current number can be fetched with
68  * g_thread_pool_get_num_unused_threads(). The maximal number can be
69  * controlled by g_thread_pool_get_max_unused_threads() and
70  * g_thread_pool_set_max_unused_threads(). All currently unused threads
71  * can be stopped by calling g_thread_pool_stop_unused_threads().
72  */
73
74 #define DEBUG_MSG(x)
75 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
76
77 typedef struct _GRealThreadPool GRealThreadPool;
78
79 /**
80  * GThreadPool:
81  * @func: the function to execute in the threads of this pool
82  * @user_data: the user data for the threads of this pool
83  * @exclusive: are all threads exclusive to this pool
84  *
85  * The #GThreadPool struct represents a thread pool. It has three
86  * public read-only members, but the underlying struct is bigger,
87  * so you must not copy this struct.
88  */
89 struct _GRealThreadPool
90 {
91   GThreadPool pool;
92   GAsyncQueue *queue;
93   GCond *cond;
94   gint max_threads;
95   gint num_threads;
96   gboolean running;
97   gboolean immediate;
98   gboolean waiting;
99   GCompareDataFunc sort_func;
100   gpointer sort_user_data;
101 };
102
103 /* The following is just an address to mark the wakeup order for a
104  * thread, it could be any address (as long, as it isn't a valid
105  * GThreadPool address)
106  */
107 static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
108 static gint wakeup_thread_serial = 0;
109
110 /* Here all unused threads are waiting  */
111 static GAsyncQueue *unused_thread_queue = NULL;
112 static gint unused_threads = 0;
113 static gint max_unused_threads = 0;
114 static gint kill_unused_threads = 0;
115 static guint max_idle_time = 0;
116
117 static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
118                                                            gpointer          data);
119 static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
120 static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
121 static gboolean         g_thread_pool_start_thread        (GRealThreadPool  *pool,
122                                                            GError          **error);
123 static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
124 static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
125 static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
126
127 static void
128 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
129                                    gpointer         data)
130 {
131   if (pool->sort_func)
132     g_async_queue_push_sorted_unlocked (pool->queue,
133                                         data,
134                                         pool->sort_func,
135                                         pool->sort_user_data);
136   else
137     g_async_queue_push_unlocked (pool->queue, data);
138 }
139
140 static GRealThreadPool*
141 g_thread_pool_wait_for_new_pool (void)
142 {
143   GRealThreadPool *pool;
144   gint local_wakeup_thread_serial;
145   guint local_max_unused_threads;
146   gint local_max_idle_time;
147   gint last_wakeup_thread_serial;
148   gboolean have_relayed_thread_marker = FALSE;
149
150   local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
151   local_max_idle_time = g_atomic_int_get (&max_idle_time);
152   last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
153
154   g_atomic_int_inc (&unused_threads);
155
156   do
157     {
158       if (g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
159         {
160           /* If this is a superfluous thread, stop it. */
161           pool = NULL;
162         }
163       else if (local_max_idle_time > 0)
164         {
165           /* If a maximal idle time is given, wait for the given time. */
166           GTimeVal end_time;
167
168           g_get_current_time (&end_time);
169           g_time_val_add (&end_time, local_max_idle_time * 1000);
170
171           DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
172                       g_thread_self (), local_max_idle_time / 1000.0));
173
174           pool = g_async_queue_timed_pop (unused_thread_queue, &end_time);
175         }
176       else
177         {
178           /* If no maximal idle time is given, wait indefinitely. */
179           DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
180           pool = g_async_queue_pop (unused_thread_queue);
181         }
182
183       if (pool == wakeup_thread_marker)
184         {
185           local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
186           if (last_wakeup_thread_serial == local_wakeup_thread_serial)
187             {
188               if (!have_relayed_thread_marker)
189               {
190                 /* If this wakeup marker has been received for
191                  * the second time, relay it.
192                  */
193                 DEBUG_MSG (("thread %p relaying wakeup message to "
194                             "waiting thread with lower serial.",
195                             g_thread_self ()));
196
197                 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
198                 have_relayed_thread_marker = TRUE;
199
200                 /* If a wakeup marker has been relayed, this thread
201                  * will get out of the way for 100 microseconds to
202                  * avoid receiving this marker again.
203                  */
204                 g_usleep (100);
205               }
206             }
207           else
208             {
209               if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
210                 {
211                   pool = NULL;
212                   break;
213                 }
214
215               DEBUG_MSG (("thread %p updating to new limits.",
216                           g_thread_self ()));
217
218               local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
219               local_max_idle_time = g_atomic_int_get (&max_idle_time);
220               last_wakeup_thread_serial = local_wakeup_thread_serial;
221
222               have_relayed_thread_marker = FALSE;
223             }
224         }
225     }
226   while (pool == wakeup_thread_marker);
227
228   g_atomic_int_add (&unused_threads, -1);
229
230   return pool;
231 }
232
233 static gpointer
234 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
235 {
236   gpointer task = NULL;
237
238   if (pool->running || (!pool->immediate &&
239                         g_async_queue_length_unlocked (pool->queue) > 0))
240     {
241       /* This thread pool is still active. */
242       if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
243         {
244           /* This is a superfluous thread, so it goes to the global pool. */
245           DEBUG_MSG (("superfluous thread %p in pool %p.",
246                       g_thread_self (), pool));
247         }
248       else if (pool->pool.exclusive)
249         {
250           /* Exclusive threads stay attached to the pool. */
251           task = g_async_queue_pop_unlocked (pool->queue);
252
253           DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
254                       "(%d running, %d unprocessed).",
255                       g_thread_self (), pool, pool->num_threads,
256                       g_async_queue_length_unlocked (pool->queue)));
257         }
258       else
259         {
260           /* A thread will wait for new tasks for at most 1/2
261            * second before going to the global pool.
262            */
263           GTimeVal end_time;
264
265           g_get_current_time (&end_time);
266           g_time_val_add (&end_time, G_USEC_PER_SEC / 2);    /* 1/2 second */
267
268           DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
269                       "(%d running, %d unprocessed).",
270                       g_thread_self (), pool, pool->num_threads,
271                       g_async_queue_length_unlocked (pool->queue)));
272
273           task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
274         }
275     }
276   else
277     {
278       /* This thread pool is inactive, it will no longer process tasks. */
279       DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
280                   "(running: %s, immediate: %s, len: %d).",
281                   pool, g_thread_self (),
282                   pool->running ? "true" : "false",
283                   pool->immediate ? "true" : "false",
284                   g_async_queue_length_unlocked (pool->queue)));
285     }
286
287   return task;
288 }
289
290
291 static gpointer
292 g_thread_pool_thread_proxy (gpointer data)
293 {
294   GRealThreadPool *pool;
295
296   pool = data;
297
298   DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
299
300   g_async_queue_lock (pool->queue);
301
302   while (TRUE)
303     {
304       gpointer task;
305
306       task = g_thread_pool_wait_for_new_task (pool);
307       if (task)
308         {
309           if (pool->running || !pool->immediate)
310             {
311               /* A task was received and the thread pool is active,
312                * so execute the function.
313                */
314               g_async_queue_unlock (pool->queue);
315               DEBUG_MSG (("thread %p in pool %p calling func.",
316                           g_thread_self (), pool));
317               pool->pool.func (task, pool->pool.user_data);
318               g_async_queue_lock (pool->queue);
319             }
320         }
321       else
322         {
323           /* No task was received, so this thread goes to the global pool. */
324           gboolean free_pool = FALSE;
325
326           DEBUG_MSG (("thread %p leaving pool %p for global pool.",
327                       g_thread_self (), pool));
328           pool->num_threads--;
329
330           if (!pool->running)
331             {
332               if (!pool->waiting)
333                 {
334                   if (pool->num_threads == 0)
335                     {
336                       /* If the pool is not running and no other
337                        * thread is waiting for this thread pool to
338                        * finish and this is the last thread of this
339                        * pool, free the pool.
340                        */
341                       free_pool = TRUE;
342                     }
343                   else
344                     {
345                       /* If the pool is not running and no other
346                        * thread is waiting for this thread pool to
347                        * finish and this is not the last thread of
348                        * this pool and there are no tasks left in the
349                        * queue, wakeup the remaining threads.
350                        */
351                       if (g_async_queue_length_unlocked (pool->queue) ==
352                           - pool->num_threads)
353                         g_thread_pool_wakeup_and_stop_all (pool);
354                     }
355                 }
356               else if (pool->immediate ||
357                        g_async_queue_length_unlocked (pool->queue) <= 0)
358                 {
359                   /* If the pool is not running and another thread is
360                    * waiting for this thread pool to finish and there
361                    * are either no tasks left or the pool shall stop
362                    * immediately, inform the waiting thread of a change
363                    * of the thread pool state.
364                    */
365                   g_cond_broadcast (pool->cond);
366                 }
367             }
368
369           g_async_queue_unlock (pool->queue);
370
371           if (free_pool)
372             g_thread_pool_free_internal (pool);
373
374           if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL)
375             break;
376
377           g_async_queue_lock (pool->queue);
378
379           DEBUG_MSG (("thread %p entering pool %p from global pool.",
380                       g_thread_self (), pool));
381
382           /* pool->num_threads++ is not done here, but in
383            * g_thread_pool_start_thread to make the new started
384            * thread known to the pool before itself can do it.
385            */
386         }
387     }
388
389   return NULL;
390 }
391
392 static gboolean
393 g_thread_pool_start_thread (GRealThreadPool  *pool,
394                             GError          **error)
395 {
396   gboolean success = FALSE;
397
398   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
399     /* Enough threads are already running */
400     return TRUE;
401
402   g_async_queue_lock (unused_thread_queue);
403
404   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
405     {
406       g_async_queue_push_unlocked (unused_thread_queue, pool);
407       success = TRUE;
408     }
409
410   g_async_queue_unlock (unused_thread_queue);
411
412   if (!success)
413     {
414       GError *local_error = NULL;
415
416       /* No thread was found, we have to start a new one */
417       if (!g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error))
418         {
419           g_propagate_error (error, local_error);
420           return FALSE;
421         }
422     }
423
424   /* See comment in g_thread_pool_thread_proxy as to why this is done
425    * here and not there
426    */
427   pool->num_threads++;
428
429   return TRUE;
430 }
431
432 /**
433  * g_thread_pool_new:
434  * @func: a function to execute in the threads of the new thread pool
435  * @user_data: user data that is handed over to @func every time it
436  *     is called
437  * @max_threads: the maximal number of threads to execute concurrently
438  *     in  the new thread pool, -1 means no limit
439  * @exclusive: should this thread pool be exclusive?
440  * @error: return location for error, or %NULL
441  *
442  * This function creates a new thread pool.
443  *
444  * Whenever you call g_thread_pool_push(), either a new thread is
445  * created or an unused one is reused. At most @max_threads threads
446  * are running concurrently for this thread pool. @max_threads = -1
447  * allows unlimited threads to be created for this thread pool. The
448  * newly created or reused thread now executes the function @func
449  * with the two arguments. The first one is the parameter to
450  * g_thread_pool_push() and the second one is @user_data.
451  *
452  * The parameter @exclusive determines whether the thread pool owns
453  * all threads exclusive or shares them with other thread pools.
454  * If @exclusive is %TRUE, @max_threads threads are started
455  * immediately and they will run exclusively for this thread pool
456  * until it is destroyed by g_thread_pool_free(). If @exclusive is
457  * %FALSE, threads are created when needed and shared between all
458  * non-exclusive thread pools. This implies that @max_threads may
459  * not be -1 for exclusive thread pools.
460  *
461  * @error can be %NULL to ignore errors, or non-%NULL to report
462  * errors. An error can only occur when @exclusive is set to %TRUE
463  * and not all @max_threads threads could be created.
464  *
465  * Return value: the new #GThreadPool
466  */
467 GThreadPool *
468 g_thread_pool_new (GFunc      func,
469                    gpointer   user_data,
470                    gint       max_threads,
471                    gboolean   exclusive,
472                    GError   **error)
473 {
474   GRealThreadPool *retval;
475   G_LOCK_DEFINE_STATIC (init);
476
477   g_return_val_if_fail (func, NULL);
478   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
479   g_return_val_if_fail (max_threads >= -1, NULL);
480   g_return_val_if_fail (g_thread_supported (), NULL);
481
482   retval = g_new (GRealThreadPool, 1);
483
484   retval->pool.func = func;
485   retval->pool.user_data = user_data;
486   retval->pool.exclusive = exclusive;
487   retval->queue = g_async_queue_new ();
488   retval->cond = NULL;
489   retval->max_threads = max_threads;
490   retval->num_threads = 0;
491   retval->running = TRUE;
492   retval->immediate = FALSE;
493   retval->waiting = FALSE;
494   retval->sort_func = NULL;
495   retval->sort_user_data = NULL;
496
497   G_LOCK (init);
498   if (!unused_thread_queue)
499       unused_thread_queue = g_async_queue_new ();
500   G_UNLOCK (init);
501
502   if (retval->pool.exclusive)
503     {
504       g_async_queue_lock (retval->queue);
505
506       while (retval->num_threads < retval->max_threads)
507         {
508           GError *local_error = NULL;
509
510           if (!g_thread_pool_start_thread (retval, &local_error))
511             {
512               g_propagate_error (error, local_error);
513               break;
514             }
515         }
516
517       g_async_queue_unlock (retval->queue);
518     }
519
520   return (GThreadPool*) retval;
521 }
522
523 /**
524  * g_thread_pool_push:
525  * @pool: a #GThreadPool
526  * @data: a new task for @pool
527  * @error: return location for error, or %NULL
528  *
529  * Inserts @data into the list of tasks to be executed by @pool.
530  *
531  * When the number of currently running threads is lower than the
532  * maximal allowed number of threads, a new thread is started (or
533  * reused) with the properties given to g_thread_pool_new().
534  * Otherwise, @data stays in the queue until a thread in this pool
535  * finishes its previous task and processes @data.
536  *
537  * @error can be %NULL to ignore errors, or non-%NULL to report
538  * errors. An error can only occur when a new thread couldn't be
539  * created. In that case @data is simply appended to the queue of
540  * work to do.
541  *
542  * Before version 2.32, this function did not return a success status.
543  *
544  * Return value: %TRUE on success, %FALSE if an error occurred
545  */
546 gboolean
547 g_thread_pool_push (GThreadPool  *pool,
548                     gpointer      data,
549                     GError      **error)
550 {
551   GRealThreadPool *real;
552   gboolean result;
553
554   real = (GRealThreadPool*) pool;
555
556   g_return_val_if_fail (real, FALSE);
557   g_return_val_if_fail (real->running, FALSE);
558
559   result = TRUE;
560
561   g_async_queue_lock (real->queue);
562
563   if (g_async_queue_length_unlocked (real->queue) >= 0)
564     {
565       /* No thread is waiting in the queue */
566       GError *local_error = NULL;
567
568       if (!g_thread_pool_start_thread (real, &local_error))
569         {
570           g_propagate_error (error, local_error);
571           result = FALSE;
572         }
573     }
574
575   g_thread_pool_queue_push_unlocked (real, data);
576   g_async_queue_unlock (real->queue);
577
578   return result;
579 }
580
581 /**
582  * g_thread_pool_set_max_threads:
583  * @pool: a #GThreadPool
584  * @max_threads: a new maximal number of threads for @pool,
585  *     or -1 for unlimited
586  * @error: return location for error, or %NULL
587  *
588  * Sets the maximal allowed number of threads for @pool.
589  * A value of -1 means that the maximal number of threads
590  * is unlimited. If @pool is an exclusive thread pool, setting
591  * the maximal number of threads to -1 is not allowed.
592  *
593  * Setting @max_threads to 0 means stopping all work for @pool.
594  * It is effectively frozen until @max_threads is set to a non-zero
595  * value again.
596  *
597  * A thread is never terminated while calling @func, as supplied by
598  * g_thread_pool_new(). Instead the maximal number of threads only
599  * has effect for the allocation of new threads in g_thread_pool_push().
600  * A new thread is allocated, whenever the number of currently
601  * running threads in @pool is smaller than the maximal number.
602  *
603  * @error can be %NULL to ignore errors, or non-%NULL to report
604  * errors. An error can only occur when a new thread couldn't be
605  * created.
606  *
607  * Before version 2.32, this function did not return a success status.
608  *
609  * Return value: %TRUE on success, %FALSE if an error occurred
610  */
611 gboolean
612 g_thread_pool_set_max_threads (GThreadPool  *pool,
613                                gint          max_threads,
614                                GError      **error)
615 {
616   GRealThreadPool *real;
617   gint to_start;
618   gboolean result;
619
620   real = (GRealThreadPool*) pool;
621
622   g_return_val_if_fail (real, FALSE);
623   g_return_val_if_fail (real->running, FALSE);
624   g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
625   g_return_val_if_fail (max_threads >= -1, FALSE);
626
627   result = TRUE;
628
629   g_async_queue_lock (real->queue);
630
631   real->max_threads = max_threads;
632
633   if (pool->exclusive)
634     to_start = real->max_threads - real->num_threads;
635   else
636     to_start = g_async_queue_length_unlocked (real->queue);
637
638   for ( ; to_start > 0; to_start--)
639     {
640       GError *local_error = NULL;
641
642       if (!g_thread_pool_start_thread (real, &local_error))
643         {
644           g_propagate_error (error, local_error);
645           result = FALSE;
646           break;
647         }
648     }
649
650   g_async_queue_unlock (real->queue);
651
652   return result;
653 }
654
655 /**
656  * g_thread_pool_get_max_threads:
657  * @pool: a #GThreadPool
658  *
659  * Returns the maximal number of threads for @pool.
660  *
661  * Return value: the maximal number of threads
662  */
663 gint
664 g_thread_pool_get_max_threads (GThreadPool *pool)
665 {
666   GRealThreadPool *real;
667   gint retval;
668
669   real = (GRealThreadPool*) pool;
670
671   g_return_val_if_fail (real, 0);
672   g_return_val_if_fail (real->running, 0);
673
674   g_async_queue_lock (real->queue);
675   retval = real->max_threads;
676   g_async_queue_unlock (real->queue);
677
678   return retval;
679 }
680
681 /**
682  * g_thread_pool_get_num_threads:
683  * @pool: a #GThreadPool
684  *
685  * Returns the number of threads currently running in @pool.
686  *
687  * Return value: the number of threads currently running
688  */
689 guint
690 g_thread_pool_get_num_threads (GThreadPool *pool)
691 {
692   GRealThreadPool *real;
693   guint retval;
694
695   real = (GRealThreadPool*) pool;
696
697   g_return_val_if_fail (real, 0);
698   g_return_val_if_fail (real->running, 0);
699
700   g_async_queue_lock (real->queue);
701   retval = real->num_threads;
702   g_async_queue_unlock (real->queue);
703
704   return retval;
705 }
706
707 /**
708  * g_thread_pool_unprocessed:
709  * @pool: a #GThreadPool
710  *
711  * Returns the number of tasks still unprocessed in @pool.
712  *
713  * Return value: the number of unprocessed tasks
714  */
715 guint
716 g_thread_pool_unprocessed (GThreadPool *pool)
717 {
718   GRealThreadPool *real;
719   gint unprocessed;
720
721   real = (GRealThreadPool*) pool;
722
723   g_return_val_if_fail (real, 0);
724   g_return_val_if_fail (real->running, 0);
725
726   unprocessed = g_async_queue_length (real->queue);
727
728   return MAX (unprocessed, 0);
729 }
730
731 /**
732  * g_thread_pool_free:
733  * @pool: a #GThreadPool
734  * @immediate: should @pool shut down immediately?
735  * @wait_: should the function wait for all tasks to be finished?
736  *
737  * Frees all resources allocated for @pool.
738  *
739  * If @immediate is %TRUE, no new task is processed for @pool.
740  * Otherwise @pool is not freed before the last task is processed.
741  * Note however, that no thread of this pool is interrupted while
742  * processing a task. Instead at least all still running threads
743  * can finish their tasks before the @pool is freed.
744  *
745  * If @wait_ is %TRUE, the functions does not return before all
746  * tasks to be processed (dependent on @immediate, whether all
747  * or only the currently running) are ready. Otherwise the function
748  * returns immediately.
749  *
750  * After calling this function @pool must not be used anymore.
751  */
752 void
753 g_thread_pool_free (GThreadPool *pool,
754                     gboolean     immediate,
755                     gboolean     wait_)
756 {
757   GRealThreadPool *real;
758
759   real = (GRealThreadPool*) pool;
760
761   g_return_if_fail (real);
762   g_return_if_fail (real->running);
763
764   /* If there's no thread allowed here, there is not much sense in
765    * not stopping this pool immediately, when it's not empty
766    */
767   g_return_if_fail (immediate ||
768                     real->max_threads != 0 ||
769                     g_async_queue_length (real->queue) == 0);
770
771   g_async_queue_lock (real->queue);
772
773   real->running = FALSE;
774   real->immediate = immediate;
775   real->waiting = wait_;
776
777   if (wait_)
778     {
779       real->cond = g_cond_new ();
780
781       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads &&
782              !(immediate && real->num_threads == 0))
783         g_cond_wait (real->cond, _g_async_queue_get_mutex (real->queue));
784     }
785
786   if (immediate || g_async_queue_length_unlocked (real->queue) == -real->num_threads)
787     {
788       /* No thread is currently doing something (and nothing is left
789        * to process in the queue)
790        */
791       if (real->num_threads == 0)
792         {
793           /* No threads left, we clean up */
794           g_async_queue_unlock (real->queue);
795           g_thread_pool_free_internal (real);
796           return;
797         }
798
799       g_thread_pool_wakeup_and_stop_all (real);
800     }
801
802   /* The last thread should cleanup the pool */
803   real->waiting = FALSE;
804   g_async_queue_unlock (real->queue);
805 }
806
807 static void
808 g_thread_pool_free_internal (GRealThreadPool* pool)
809 {
810   g_return_if_fail (pool);
811   g_return_if_fail (pool->running == FALSE);
812   g_return_if_fail (pool->num_threads == 0);
813
814   g_async_queue_unref (pool->queue);
815
816   if (pool->cond)
817     g_cond_free (pool->cond);
818
819   g_free (pool);
820 }
821
822 static void
823 g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
824 {
825   guint i;
826
827   g_return_if_fail (pool);
828   g_return_if_fail (pool->running == FALSE);
829   g_return_if_fail (pool->num_threads != 0);
830
831   pool->immediate = TRUE;
832
833   for (i = 0; i < pool->num_threads; i++)
834     g_thread_pool_queue_push_unlocked (pool, GUINT_TO_POINTER (1));
835 }
836
837 /**
838  * g_thread_pool_set_max_unused_threads:
839  * @max_threads: maximal number of unused threads
840  *
841  * Sets the maximal number of unused threads to @max_threads.
842  * If @max_threads is -1, no limit is imposed on the number
843  * of unused threads.
844  */
845 void
846 g_thread_pool_set_max_unused_threads (gint max_threads)
847 {
848   g_return_if_fail (max_threads >= -1);
849
850   g_atomic_int_set (&max_unused_threads, max_threads);
851
852   if (max_threads != -1)
853     {
854       max_threads -= g_atomic_int_get (&unused_threads);
855       if (max_threads < 0)
856         {
857           g_atomic_int_set (&kill_unused_threads, -max_threads);
858           g_atomic_int_inc (&wakeup_thread_serial);
859
860           g_async_queue_lock (unused_thread_queue);
861
862           do
863             {
864               g_async_queue_push_unlocked (unused_thread_queue,
865                                            wakeup_thread_marker);
866             }
867           while (++max_threads);
868
869           g_async_queue_unlock (unused_thread_queue);
870         }
871     }
872 }
873
874 /**
875  * g_thread_pool_get_max_unused_threads:
876  *
877  * Returns the maximal allowed number of unused threads.
878  *
879  * Return value: the maximal number of unused threads
880  */
881 gint
882 g_thread_pool_get_max_unused_threads (void)
883 {
884   return g_atomic_int_get (&max_unused_threads);
885 }
886
887 /**
888  * g_thread_pool_get_num_unused_threads:
889  *
890  * Returns the number of currently unused threads.
891  *
892  * Return value: the number of currently unused threads
893  */
894 guint
895 g_thread_pool_get_num_unused_threads (void)
896 {
897   return g_atomic_int_get (&unused_threads);
898 }
899
900 /**
901  * g_thread_pool_stop_unused_threads:
902  *
903  * Stops all currently unused threads. This does not change the
904  * maximal number of unused threads. This function can be used to
905  * regularly stop all unused threads e.g. from g_timeout_add().
906  */
907 void
908 g_thread_pool_stop_unused_threads (void)
909 {
910   guint oldval;
911
912   oldval = g_thread_pool_get_max_unused_threads ();
913
914   g_thread_pool_set_max_unused_threads (0);
915   g_thread_pool_set_max_unused_threads (oldval);
916 }
917
918 /**
919  * g_thread_pool_set_sort_function:
920  * @pool: a #GThreadPool
921  * @func: the #GCompareDataFunc used to sort the list of tasks.
922  *     This function is passed two tasks. It should return
923  *     0 if the order in which they are handled does not matter,
924  *     a negative value if the first task should be processed before
925  *     the second or a positive value if the second task should be
926  *     processed first.
927  * @user_data: user data passed to @func
928  *
929  * Sets the function used to sort the list of tasks. This allows the
930  * tasks to be processed by a priority determined by @func, and not
931  * just in the order in which they were added to the pool.
932  *
933  * Note, if the maximum number of threads is more than 1, the order
934  * that threads are executed cannot be guaranteed 100%. Threads are
935  * scheduled by the operating system and are executed at random. It
936  * cannot be assumed that threads are executed in the order they are
937  * created.
938  *
939  * Since: 2.10
940  */
941 void
942 g_thread_pool_set_sort_function (GThreadPool      *pool,
943                                  GCompareDataFunc  func,
944                                  gpointer          user_data)
945 {
946   GRealThreadPool *real;
947
948   real = (GRealThreadPool*) pool;
949
950   g_return_if_fail (real);
951   g_return_if_fail (real->running);
952
953   g_async_queue_lock (real->queue);
954
955   real->sort_func = func;
956   real->sort_user_data = user_data;
957
958   if (func)
959     g_async_queue_sort_unlocked (real->queue,
960                                  real->sort_func,
961                                  real->sort_user_data);
962
963   g_async_queue_unlock (real->queue);
964 }
965
966 /**
967  * g_thread_pool_set_max_idle_time:
968  * @interval: the maximum @interval (in milliseconds)
969  *     a thread can be idle
970  *
971  * This function will set the maximum @interval that a thread
972  * waiting in the pool for new tasks can be idle for before
973  * being stopped. This function is similar to calling
974  * g_thread_pool_stop_unused_threads() on a regular timeout,
975  * except this is done on a per thread basis.
976  *
977  * By setting @interval to 0, idle threads will not be stopped.
978  *
979  * This function makes use of g_async_queue_timed_pop () using
980  * @interval.
981  *
982  * Since: 2.10
983  */
984 void
985 g_thread_pool_set_max_idle_time (guint interval)
986 {
987   guint i;
988
989   g_atomic_int_set (&max_idle_time, interval);
990
991   i = g_atomic_int_get (&unused_threads);
992   if (i > 0)
993     {
994       g_atomic_int_inc (&wakeup_thread_serial);
995       g_async_queue_lock (unused_thread_queue);
996
997       do
998         {
999           g_async_queue_push_unlocked (unused_thread_queue,
1000                                        wakeup_thread_marker);
1001         }
1002       while (--i);
1003
1004       g_async_queue_unlock (unused_thread_queue);
1005     }
1006 }
1007
1008 /**
1009  * g_thread_pool_get_max_idle_time:
1010  *
1011  * This function will return the maximum @interval that a
1012  * thread will wait in the thread pool for new tasks before
1013  * being stopped.
1014  *
1015  * If this function returns 0, threads waiting in the thread
1016  * pool for new work are not stopped.
1017  *
1018  * Return value: the maximum @interval (milliseconds) to wait
1019  *     for new tasks in the thread pool before stopping the
1020  *     thread
1021  *
1022  * Since: 2.10
1023  */
1024 guint
1025 g_thread_pool_get_max_idle_time (void)
1026 {
1027   return g_atomic_int_get (&max_idle_time);
1028 }