G_VARIANT_TYPE_VARDICT: Add 'Since:' tag
[platform/upstream/glib.git] / glib / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "gthreadpool.h"
30
31 #include "gasyncqueue.h"
32 #include "gmain.h"
33 #include "gtestutils.h"
34 #include "gtimer.h"
35
36 /**
37  * SECTION:thread_pools
38  * @title: Thread Pools
39  * @short_description: pools of threads to execute work concurrently
40  * @see_also: <para> <variablelist> <varlistentry>
41  *            <term>#GThread</term> <listitem><para>GLib thread
42  *            system.</para></listitem> </varlistentry> </variablelist>
43  *            </para>
44  *
45  * Sometimes you wish to asynchronously fork out the execution of work
46  * and continue working in your own thread. If that will happen often,
47  * the overhead of starting and destroying a thread each time might be
48  * too high. In such cases reusing already started threads seems like a
49  * good idea. And it indeed is, but implementing this can be tedious
50  * and error-prone.
51  *
52  * Therefore GLib provides thread pools for your convenience. An added
53  * advantage is, that the threads can be shared between the different
54  * subsystems of your program, when they are using GLib.
55  *
56  * To create a new thread pool, you use g_thread_pool_new(). It is
57  * destroyed by g_thread_pool_free().
58  *
59  * If you want to execute a certain task within a thread pool, you call
60  * g_thread_pool_push().
61  *
62  * To get the current number of running threads you call
63  * g_thread_pool_get_num_threads(). To get the number of still
64  * unprocessed tasks you call g_thread_pool_unprocessed(). To control
65  * the maximal number of threads for a thread pool, you use
66  * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
67  *
68  * Finally you can control the number of unused threads, that are kept
69  * alive by GLib for future use. The current number can be fetched with
70  * g_thread_pool_get_num_unused_threads(). The maximal number can be
71  * controlled by g_thread_pool_get_max_unused_threads() and
72  * g_thread_pool_set_max_unused_threads(). All currently unused threads
73  * can be stopped by calling g_thread_pool_stop_unused_threads().
74  **/
75
76 #define DEBUG_MSG(x)  
77 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
78
79 typedef struct _GRealThreadPool GRealThreadPool;
80
81 /**
82  * GThreadPool:
83  * @func: the function to execute in the threads of this pool
84  * @user_data: the user data for the threads of this pool
85  * @exclusive: are all threads exclusive to this pool
86  *
87  * The #GThreadPool struct represents a thread pool. It has three
88  * public read-only members, but the underlying struct is bigger, so
89  * you must not copy this struct.
90  **/
91 struct _GRealThreadPool
92 {
93   GThreadPool pool;
94   GAsyncQueue* queue;
95   GCond* cond;
96   gint max_threads;
97   gint num_threads;
98   gboolean running;
99   gboolean immediate;
100   gboolean waiting;
101   GCompareDataFunc sort_func;
102   gpointer sort_user_data;
103 };
104
105 /* The following is just an address to mark the wakeup order for a
106  * thread, it could be any address (as long, as it isn't a valid
107  * GThreadPool address) */
108 static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
109 static gint wakeup_thread_serial = 0;
110
111 /* Here all unused threads are waiting  */
112 static GAsyncQueue *unused_thread_queue = NULL;
113 static gint unused_threads = 0;
114 static gint max_unused_threads = 0;
115 static gint kill_unused_threads = 0;
116 static guint max_idle_time = 0;
117
118 static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
119                                                            gpointer          data);
120 static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
121 static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
122 static void             g_thread_pool_start_thread        (GRealThreadPool  *pool,
123                                                            GError          **error);
124 static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
125 static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
126 static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
127
128 static void
129 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
130                                    gpointer         data)
131 {
132   if (pool->sort_func) 
133     g_async_queue_push_sorted_unlocked (pool->queue, 
134                                         data,
135                                         pool->sort_func, 
136                                         pool->sort_user_data);
137   else
138     g_async_queue_push_unlocked (pool->queue, data);
139 }
140
141 static GRealThreadPool*
142 g_thread_pool_wait_for_new_pool (void)
143 {
144   GRealThreadPool *pool;
145   gint local_wakeup_thread_serial;
146   guint local_max_unused_threads;
147   gint local_max_idle_time;
148   gint last_wakeup_thread_serial;
149   gboolean have_relayed_thread_marker = FALSE;
150
151   local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
152   local_max_idle_time = g_atomic_int_get (&max_idle_time);
153   last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
154
155   g_atomic_int_inc (&unused_threads);
156
157   do
158     {
159       if (g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
160         {
161           /* If this is a superfluous thread, stop it. */
162           pool = NULL;
163         }
164       else if (local_max_idle_time > 0)
165         {
166           /* If a maximal idle time is given, wait for the given time. */
167           GTimeVal end_time;
168
169           g_get_current_time (&end_time);
170           g_time_val_add (&end_time, local_max_idle_time * 1000);
171
172           DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
173                       g_thread_self (), local_max_idle_time / 1000.0));
174
175           pool = g_async_queue_timed_pop (unused_thread_queue, &end_time);
176         }
177       else
178         {
179           /* If no maximal idle time is given, wait indefinitely. */
180           DEBUG_MSG (("thread %p waiting in global pool.",
181                       g_thread_self ()));
182           pool = g_async_queue_pop (unused_thread_queue);
183         }
184
185       if (pool == wakeup_thread_marker)
186         {
187           local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
188           if (last_wakeup_thread_serial == local_wakeup_thread_serial)
189             {
190               if (!have_relayed_thread_marker)
191               {
192                 /* If this wakeup marker has been received for
193                  * the second time, relay it. 
194                  */
195                 DEBUG_MSG (("thread %p relaying wakeup message to "
196                             "waiting thread with lower serial.",
197                             g_thread_self ()));
198
199                 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
200                 have_relayed_thread_marker = TRUE;
201
202                 /* If a wakeup marker has been relayed, this thread
203                  * will get out of the way for 100 microseconds to
204                  * avoid receiving this marker again. */
205                 g_usleep (100);
206               }
207             }
208           else
209             {
210               if (g_atomic_int_exchange_and_add (&kill_unused_threads, -1) > 0)
211                 {
212                   pool = NULL;
213                   break;
214                 }
215
216               DEBUG_MSG (("thread %p updating to new limits.",
217                           g_thread_self ()));
218
219               local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
220               local_max_idle_time = g_atomic_int_get (&max_idle_time);
221               last_wakeup_thread_serial = local_wakeup_thread_serial;
222
223               have_relayed_thread_marker = FALSE;
224             }
225         }
226     }
227   while (pool == wakeup_thread_marker);
228
229   g_atomic_int_add (&unused_threads, -1);
230
231   return pool;
232 }
233
234 static gpointer
235 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
236 {
237   gpointer task = NULL;
238
239   if (pool->running || (!pool->immediate &&
240                         g_async_queue_length_unlocked (pool->queue) > 0))
241     {
242       /* This thread pool is still active. */
243       if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
244         {
245           /* This is a superfluous thread, so it goes to the global pool. */
246           DEBUG_MSG (("superfluous thread %p in pool %p.",
247                       g_thread_self (), pool));
248         }
249       else if (pool->pool.exclusive)
250         {
251           /* Exclusive threads stay attached to the pool. */
252           task = g_async_queue_pop_unlocked (pool->queue);
253
254           DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
255                       "(%d running, %d unprocessed).",
256                       g_thread_self (), pool, pool->num_threads,
257                       g_async_queue_length_unlocked (pool->queue)));
258         }
259       else
260         {
261           /* A thread will wait for new tasks for at most 1/2
262            * second before going to the global pool.
263            */
264           GTimeVal end_time;
265
266           g_get_current_time (&end_time);
267           g_time_val_add (&end_time, G_USEC_PER_SEC / 2);       /* 1/2 second */
268
269           DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
270                       "(%d running, %d unprocessed).",
271                       g_thread_self (), pool, pool->num_threads,
272                       g_async_queue_length_unlocked (pool->queue)));
273
274           task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
275         }
276     }
277   else
278     {
279       /* This thread pool is inactive, it will no longer process tasks. */
280       DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
281                   "(running: %s, immediate: %s, len: %d).",
282                   pool, g_thread_self (),
283                   pool->running ? "true" : "false",
284                   pool->immediate ? "true" : "false",
285                   g_async_queue_length_unlocked (pool->queue)));
286     }
287
288   return task;
289 }
290
291
292 static gpointer 
293 g_thread_pool_thread_proxy (gpointer data)
294 {
295   GRealThreadPool *pool;
296
297   pool = data;
298
299   DEBUG_MSG (("thread %p started for pool %p.", 
300               g_thread_self (), pool));
301
302   g_async_queue_lock (pool->queue);
303
304   while (TRUE)
305     {
306       gpointer task;
307
308       task = g_thread_pool_wait_for_new_task (pool);
309       if (task)
310         {
311           if (pool->running || !pool->immediate)
312             {
313               /* A task was received and the thread pool is active, so
314                * execute the function. 
315                */
316               g_async_queue_unlock (pool->queue);
317               DEBUG_MSG (("thread %p in pool %p calling func.", 
318                           g_thread_self (), pool));
319               pool->pool.func (task, pool->pool.user_data);
320               g_async_queue_lock (pool->queue);
321             }
322         }
323       else
324         {
325           /* No task was received, so this thread goes to the global
326            * pool. 
327            */
328           gboolean free_pool = FALSE;
329  
330           DEBUG_MSG (("thread %p leaving pool %p for global pool.", 
331                       g_thread_self (), pool));
332           pool->num_threads--;
333
334           if (!pool->running)
335             {
336               if (!pool->waiting)
337                 {
338                   if (pool->num_threads == 0)
339                     {
340                       /* If the pool is not running and no other
341                        * thread is waiting for this thread pool to
342                        * finish and this is the last thread of this
343                        * pool, free the pool.
344                        */
345                       free_pool = TRUE;
346                     }           
347                   else 
348                     {
349                       /* If the pool is not running and no other
350                        * thread is waiting for this thread pool to
351                        * finish and this is not the last thread of
352                        * this pool and there are no tasks left in the
353                        * queue, wakeup the remaining threads. 
354                        */
355                       if (g_async_queue_length_unlocked (pool->queue) == 
356                           - pool->num_threads)
357                         g_thread_pool_wakeup_and_stop_all (pool);
358                     }
359                 }
360               else if (pool->immediate || 
361                        g_async_queue_length_unlocked (pool->queue) <= 0)
362                 {
363                   /* If the pool is not running and another thread is
364                    * waiting for this thread pool to finish and there
365                    * are either no tasks left or the pool shall stop
366                    * immediatly, inform the waiting thread of a change
367                    * of the thread pool state. 
368                    */
369                   g_cond_broadcast (pool->cond);
370                 }
371             }
372
373           g_async_queue_unlock (pool->queue);
374
375           if (free_pool)
376             g_thread_pool_free_internal (pool);
377
378           if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL) 
379             break;
380
381           g_async_queue_lock (pool->queue);
382           
383           DEBUG_MSG (("thread %p entering pool %p from global pool.", 
384                       g_thread_self (), pool));
385
386           /* pool->num_threads++ is not done here, but in
387            * g_thread_pool_start_thread to make the new started thread
388            * known to the pool, before itself can do it. 
389            */
390         }
391     }
392
393   return NULL;
394 }
395
396 static void
397 g_thread_pool_start_thread (GRealThreadPool  *pool, 
398                             GError          **error)
399 {
400   gboolean success = FALSE;
401   
402   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
403     /* Enough threads are already running */
404     return;
405
406   g_async_queue_lock (unused_thread_queue);
407
408   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
409     {
410       g_async_queue_push_unlocked (unused_thread_queue, pool);
411       success = TRUE;
412     }
413
414   g_async_queue_unlock (unused_thread_queue);
415
416   if (!success)
417     {
418       GError *local_error = NULL;
419       /* No thread was found, we have to start a new one */
420       g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
421       
422       if (local_error)
423         {
424           g_propagate_error (error, local_error);
425           return;
426         }
427     }
428
429   /* See comment in g_thread_pool_thread_proxy as to why this is done
430    * here and not there
431    */
432   pool->num_threads++;
433 }
434
435 /**
436  * g_thread_pool_new: 
437  * @func: a function to execute in the threads of the new thread pool
438  * @user_data: user data that is handed over to @func every time it 
439  *   is called
440  * @max_threads: the maximal number of threads to execute concurrently in 
441  *   the new thread pool, -1 means no limit
442  * @exclusive: should this thread pool be exclusive?
443  * @error: return location for error
444  *
445  * This function creates a new thread pool.
446  *
447  * Whenever you call g_thread_pool_push(), either a new thread is
448  * created or an unused one is reused. At most @max_threads threads
449  * are running concurrently for this thread pool. @max_threads = -1
450  * allows unlimited threads to be created for this thread pool. The
451  * newly created or reused thread now executes the function @func with
452  * the two arguments. The first one is the parameter to
453  * g_thread_pool_push() and the second one is @user_data.
454  *
455  * The parameter @exclusive determines, whether the thread pool owns
456  * all threads exclusive or whether the threads are shared
457  * globally. If @exclusive is %TRUE, @max_threads threads are started
458  * immediately and they will run exclusively for this thread pool until
459  * it is destroyed by g_thread_pool_free(). If @exclusive is %FALSE,
460  * threads are created, when needed and shared between all
461  * non-exclusive thread pools. This implies that @max_threads may not
462  * be -1 for exclusive thread pools.
463  *
464  * @error can be %NULL to ignore errors, or non-%NULL to report
465  * errors. An error can only occur when @exclusive is set to %TRUE and
466  * not all @max_threads threads could be created.
467  *
468  * Return value: the new #GThreadPool
469  **/
470 GThreadPool* 
471 g_thread_pool_new (GFunc            func,
472                    gpointer         user_data,
473                    gint             max_threads,
474                    gboolean         exclusive,
475                    GError         **error)
476 {
477   GRealThreadPool *retval;
478   G_LOCK_DEFINE_STATIC (init);
479
480   g_return_val_if_fail (func, NULL);
481   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
482   g_return_val_if_fail (max_threads >= -1, NULL);
483   g_return_val_if_fail (g_thread_supported (), NULL);
484
485   retval = g_new (GRealThreadPool, 1);
486
487   retval->pool.func = func;
488   retval->pool.user_data = user_data;
489   retval->pool.exclusive = exclusive;
490   retval->queue = g_async_queue_new ();
491   retval->cond = NULL;
492   retval->max_threads = max_threads;
493   retval->num_threads = 0;
494   retval->running = TRUE;
495   retval->sort_func = NULL;
496   retval->sort_user_data = NULL;
497
498   G_LOCK (init);
499   if (!unused_thread_queue)
500       unused_thread_queue = g_async_queue_new ();
501   G_UNLOCK (init);
502
503   if (retval->pool.exclusive)
504     {
505       g_async_queue_lock (retval->queue);
506   
507       while (retval->num_threads < retval->max_threads)
508         {
509           GError *local_error = NULL;
510           g_thread_pool_start_thread (retval, &local_error);
511           if (local_error)
512             {
513               g_propagate_error (error, local_error);
514               break;
515             }
516         }
517
518       g_async_queue_unlock (retval->queue);
519     }
520
521   return (GThreadPool*) retval;
522 }
523
524 /**
525  * g_thread_pool_push:
526  * @pool: a #GThreadPool
527  * @data: a new task for @pool
528  * @error: return location for error
529  * 
530  * Inserts @data into the list of tasks to be executed by @pool. When
531  * the number of currently running threads is lower than the maximal
532  * allowed number of threads, a new thread is started (or reused) with
533  * the properties given to g_thread_pool_new (). Otherwise @data stays
534  * in the queue until a thread in this pool finishes its previous task
535  * and processes @data. 
536  *
537  * @error can be %NULL to ignore errors, or non-%NULL to report
538  * errors. An error can only occur when a new thread couldn't be
539  * created. In that case @data is simply appended to the queue of work
540  * to do.  
541  **/
542 void 
543 g_thread_pool_push (GThreadPool  *pool,
544                     gpointer      data,
545                     GError      **error)
546 {
547   GRealThreadPool *real;
548
549   real = (GRealThreadPool*) pool;
550
551   g_return_if_fail (real);
552   g_return_if_fail (real->running);
553
554   g_async_queue_lock (real->queue);
555
556   if (g_async_queue_length_unlocked (real->queue) >= 0)
557     /* No thread is waiting in the queue */
558     g_thread_pool_start_thread (real, error);
559
560   g_thread_pool_queue_push_unlocked (real, data);
561   g_async_queue_unlock (real->queue);
562 }
563
564 /**
565  * g_thread_pool_set_max_threads:
566  * @pool: a #GThreadPool
567  * @max_threads: a new maximal number of threads for @pool
568  * @error: return location for error
569  * 
570  * Sets the maximal allowed number of threads for @pool. A value of -1
571  * means, that the maximal number of threads is unlimited.
572  *
573  * Setting @max_threads to 0 means stopping all work for @pool. It is
574  * effectively frozen until @max_threads is set to a non-zero value
575  * again.
576  * 
577  * A thread is never terminated while calling @func, as supplied by
578  * g_thread_pool_new (). Instead the maximal number of threads only
579  * has effect for the allocation of new threads in g_thread_pool_push(). 
580  * A new thread is allocated, whenever the number of currently
581  * running threads in @pool is smaller than the maximal number.
582  *
583  * @error can be %NULL to ignore errors, or non-%NULL to report
584  * errors. An error can only occur when a new thread couldn't be
585  * created. 
586  **/
587 void
588 g_thread_pool_set_max_threads (GThreadPool  *pool,
589                                gint          max_threads,
590                                GError      **error)
591 {
592   GRealThreadPool *real;
593   gint to_start;
594
595   real = (GRealThreadPool*) pool;
596
597   g_return_if_fail (real);
598   g_return_if_fail (real->running);
599   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
600   g_return_if_fail (max_threads >= -1);
601
602   g_async_queue_lock (real->queue);
603
604   real->max_threads = max_threads;
605   
606   if (pool->exclusive)
607     to_start = real->max_threads - real->num_threads;
608   else
609     to_start = g_async_queue_length_unlocked (real->queue);
610   
611   for ( ; to_start > 0; to_start--)
612     {
613       GError *local_error = NULL;
614
615       g_thread_pool_start_thread (real, &local_error);
616       if (local_error)
617         {
618           g_propagate_error (error, local_error);
619           break;
620         }
621     }
622    
623   g_async_queue_unlock (real->queue);
624 }
625
626 /**
627  * g_thread_pool_get_max_threads:
628  * @pool: a #GThreadPool
629  *
630  * Returns the maximal number of threads for @pool.
631  *
632  * Return value: the maximal number of threads
633  **/
634 gint
635 g_thread_pool_get_max_threads (GThreadPool *pool)
636 {
637   GRealThreadPool *real;
638   gint retval;
639
640   real = (GRealThreadPool*) pool;
641
642   g_return_val_if_fail (real, 0);
643   g_return_val_if_fail (real->running, 0);
644
645   g_async_queue_lock (real->queue);
646   retval = real->max_threads;
647   g_async_queue_unlock (real->queue);
648
649   return retval;
650 }
651
652 /**
653  * g_thread_pool_get_num_threads:
654  * @pool: a #GThreadPool
655  *
656  * Returns the number of threads currently running in @pool.
657  *
658  * Return value: the number of threads currently running
659  **/
660 guint
661 g_thread_pool_get_num_threads (GThreadPool *pool)
662 {
663   GRealThreadPool *real;
664   guint retval;
665
666   real = (GRealThreadPool*) pool;
667
668   g_return_val_if_fail (real, 0);
669   g_return_val_if_fail (real->running, 0);
670
671   g_async_queue_lock (real->queue);
672   retval = real->num_threads;
673   g_async_queue_unlock (real->queue);
674
675   return retval;
676 }
677
678 /**
679  * g_thread_pool_unprocessed:
680  * @pool: a #GThreadPool
681  *
682  * Returns the number of tasks still unprocessed in @pool.
683  *
684  * Return value: the number of unprocessed tasks
685  **/
686 guint
687 g_thread_pool_unprocessed (GThreadPool *pool)
688 {
689   GRealThreadPool *real;
690   gint unprocessed;
691
692   real = (GRealThreadPool*) pool;
693
694   g_return_val_if_fail (real, 0);
695   g_return_val_if_fail (real->running, 0);
696
697   unprocessed = g_async_queue_length (real->queue);
698
699   return MAX (unprocessed, 0);
700 }
701
702 /**
703  * g_thread_pool_free:
704  * @pool: a #GThreadPool
705  * @immediate: should @pool shut down immediately?
706  * @wait_: should the function wait for all tasks to be finished?
707  *
708  * Frees all resources allocated for @pool.
709  *
710  * If @immediate is %TRUE, no new task is processed for
711  * @pool. Otherwise @pool is not freed before the last task is
712  * processed. Note however, that no thread of this pool is
713  * interrupted, while processing a task. Instead at least all still
714  * running threads can finish their tasks before the @pool is freed.
715  *
716  * If @wait_ is %TRUE, the functions does not return before all tasks
717  * to be processed (dependent on @immediate, whether all or only the
718  * currently running) are ready. Otherwise the function returns immediately.
719  *
720  * After calling this function @pool must not be used anymore. 
721  **/
722 void
723 g_thread_pool_free (GThreadPool *pool,
724                     gboolean     immediate,
725                     gboolean     wait_)
726 {
727   GRealThreadPool *real;
728
729   real = (GRealThreadPool*) pool;
730
731   g_return_if_fail (real);
732   g_return_if_fail (real->running);
733
734   /* If there's no thread allowed here, there is not much sense in
735    * not stopping this pool immediately, when it's not empty 
736    */
737   g_return_if_fail (immediate || 
738                     real->max_threads != 0 || 
739                     g_async_queue_length (real->queue) == 0);
740
741   g_async_queue_lock (real->queue);
742
743   real->running = FALSE;
744   real->immediate = immediate;
745   real->waiting = wait_;
746
747   if (wait_)
748     {
749       real->cond = g_cond_new ();
750
751       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads &&
752              !(immediate && real->num_threads == 0))
753         g_cond_wait (real->cond, _g_async_queue_get_mutex (real->queue));
754     }
755
756   if (immediate || g_async_queue_length_unlocked (real->queue) == -real->num_threads)
757     {
758       /* No thread is currently doing something (and nothing is left
759        * to process in the queue) 
760        */
761       if (real->num_threads == 0) 
762         {
763           /* No threads left, we clean up */
764           g_async_queue_unlock (real->queue);
765           g_thread_pool_free_internal (real);
766           return;
767         }
768
769       g_thread_pool_wakeup_and_stop_all (real);
770     }
771   
772   /* The last thread should cleanup the pool */
773   real->waiting = FALSE; 
774   g_async_queue_unlock (real->queue);
775 }
776
777 static void
778 g_thread_pool_free_internal (GRealThreadPool* pool)
779 {
780   g_return_if_fail (pool);
781   g_return_if_fail (pool->running == FALSE);
782   g_return_if_fail (pool->num_threads == 0);
783
784   g_async_queue_unref (pool->queue);
785
786   if (pool->cond)
787     g_cond_free (pool->cond);
788
789   g_free (pool);
790 }
791
792 static void
793 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
794 {
795   guint i;
796   
797   g_return_if_fail (pool);
798   g_return_if_fail (pool->running == FALSE);
799   g_return_if_fail (pool->num_threads != 0);
800
801   pool->immediate = TRUE; 
802
803   for (i = 0; i < pool->num_threads; i++)
804     g_thread_pool_queue_push_unlocked (pool, GUINT_TO_POINTER (1));
805 }
806
807 /**
808  * g_thread_pool_set_max_unused_threads:
809  * @max_threads: maximal number of unused threads
810  *
811  * Sets the maximal number of unused threads to @max_threads. If
812  * @max_threads is -1, no limit is imposed on the number of unused
813  * threads.
814  **/
815 void
816 g_thread_pool_set_max_unused_threads (gint max_threads)
817 {
818   g_return_if_fail (max_threads >= -1);  
819
820   g_atomic_int_set (&max_unused_threads, max_threads);
821
822   if (max_threads != -1)
823     {
824       max_threads -= g_atomic_int_get (&unused_threads);
825       if (max_threads < 0)
826         {
827           g_atomic_int_set (&kill_unused_threads, -max_threads);
828           g_atomic_int_inc (&wakeup_thread_serial);
829
830           g_async_queue_lock (unused_thread_queue);
831
832           do
833             {
834               g_async_queue_push_unlocked (unused_thread_queue,
835                                            wakeup_thread_marker);
836             }
837           while (++max_threads);
838
839           g_async_queue_unlock (unused_thread_queue);
840         }
841     }
842 }
843
844 /**
845  * g_thread_pool_get_max_unused_threads:
846  * 
847  * Returns the maximal allowed number of unused threads.
848  *
849  * Return value: the maximal number of unused threads
850  **/
851 gint
852 g_thread_pool_get_max_unused_threads (void)
853 {
854   return g_atomic_int_get (&max_unused_threads);
855 }
856
857 /**
858  * g_thread_pool_get_num_unused_threads:
859  * 
860  * Returns the number of currently unused threads.
861  *
862  * Return value: the number of currently unused threads
863  **/
864 guint 
865 g_thread_pool_get_num_unused_threads (void)
866 {
867   return g_atomic_int_get (&unused_threads);
868 }
869
870 /**
871  * g_thread_pool_stop_unused_threads:
872  * 
873  * Stops all currently unused threads. This does not change the
874  * maximal number of unused threads. This function can be used to
875  * regularly stop all unused threads e.g. from g_timeout_add().
876  **/
877 void
878 g_thread_pool_stop_unused_threads (void)
879
880   guint oldval;
881
882   oldval = g_thread_pool_get_max_unused_threads ();
883
884   g_thread_pool_set_max_unused_threads (0);
885   g_thread_pool_set_max_unused_threads (oldval);
886 }
887
888 /**
889  * g_thread_pool_set_sort_function:
890  * @pool: a #GThreadPool
891  * @func: the #GCompareDataFunc used to sort the list of tasks. 
892  *     This function is passed two tasks. It should return
893  *     0 if the order in which they are handled does not matter, 
894  *     a negative value if the first task should be processed before
895  *     the second or a positive value if the second task should be 
896  *     processed first.
897  * @user_data: user data passed to @func.
898  *
899  * Sets the function used to sort the list of tasks. This allows the
900  * tasks to be processed by a priority determined by @func, and not
901  * just in the order in which they were added to the pool.
902  *
903  * Note, if the maximum number of threads is more than 1, the order
904  * that threads are executed cannot be guranteed 100%. Threads are
905  * scheduled by the operating system and are executed at random. It
906  * cannot be assumed that threads are executed in the order they are
907  * created. 
908  *
909  * Since: 2.10
910  **/
911 void 
912 g_thread_pool_set_sort_function (GThreadPool      *pool,
913                                  GCompareDataFunc  func,
914                                  gpointer          user_data)
915
916   GRealThreadPool *real;
917
918   real = (GRealThreadPool*) pool;
919
920   g_return_if_fail (real);
921   g_return_if_fail (real->running);
922
923   g_async_queue_lock (real->queue);
924
925   real->sort_func = func;
926   real->sort_user_data = user_data;
927   
928   if (func) 
929     g_async_queue_sort_unlocked (real->queue, 
930                                  real->sort_func,
931                                  real->sort_user_data);
932
933   g_async_queue_unlock (real->queue);
934 }
935
936 /**
937  * g_thread_pool_set_max_idle_time:
938  * @interval: the maximum @interval (1/1000ths of a second) a thread
939  *     can be idle. 
940  *
941  * This function will set the maximum @interval that a thread waiting
942  * in the pool for new tasks can be idle for before being
943  * stopped. This function is similar to calling
944  * g_thread_pool_stop_unused_threads() on a regular timeout, except,
945  * this is done on a per thread basis.    
946  *
947  * By setting @interval to 0, idle threads will not be stopped.
948  *  
949  * This function makes use of g_async_queue_timed_pop () using
950  * @interval.
951  *
952  * Since: 2.10
953  **/
954 void
955 g_thread_pool_set_max_idle_time (guint interval)
956
957   guint i;
958
959   g_atomic_int_set (&max_idle_time, interval);
960
961   i = g_atomic_int_get (&unused_threads);
962   if (i > 0)
963     {
964       g_atomic_int_inc (&wakeup_thread_serial);
965       g_async_queue_lock (unused_thread_queue);
966
967       do
968         {
969           g_async_queue_push_unlocked (unused_thread_queue,
970                                        wakeup_thread_marker);
971         }
972       while (--i);
973
974       g_async_queue_unlock (unused_thread_queue);
975     }
976 }
977
978 /**
979  * g_thread_pool_get_max_idle_time:
980  * 
981  * This function will return the maximum @interval that a thread will
982  * wait in the thread pool for new tasks before being stopped.
983  *
984  * If this function returns 0, threads waiting in the thread pool for
985  * new work are not stopped.
986  *
987  * Return value: the maximum @interval to wait for new tasks in the
988  *     thread pool before stopping the thread (1/1000ths of a second).
989  *  
990  * Since: 2.10
991  **/
992 guint
993 g_thread_pool_get_max_idle_time (void)
994
995   return g_atomic_int_get (&max_idle_time);
996 }