glib/: fully remove galias hacks
[platform/upstream/glib.git] / glib / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "glib.h"
30
31 /**
32  * SECTION: thread_pools
33  * @title: Thread Pools
34  * @short_description: pools of threads to execute work concurrently
35  * @see_also: <para> <variablelist> <varlistentry>
36  *            <term>#GThread</term> <listitem><para>GLib thread
37  *            system.</para></listitem> </varlistentry> </variablelist>
38  *            </para>
39  *
40  * Sometimes you wish to asynchronously fork out the execution of work
41  * and continue working in your own thread. If that will happen often,
42  * the overhead of starting and destroying a thread each time might be
43  * too high. In such cases reusing already started threads seems like a
44  * good idea. And it indeed is, but implementing this can be tedious
45  * and error-prone.
46  *
47  * Therefore GLib provides thread pools for your convenience. An added
48  * advantage is, that the threads can be shared between the different
49  * subsystems of your program, when they are using GLib.
50  *
51  * To create a new thread pool, you use g_thread_pool_new(). It is
52  * destroyed by g_thread_pool_free().
53  *
54  * If you want to execute a certain task within a thread pool, you call
55  * g_thread_pool_push().
56  *
57  * To get the current number of running threads you call
58  * g_thread_pool_get_num_threads(). To get the number of still
59  * unprocessed tasks you call g_thread_pool_unprocessed(). To control
60  * the maximal number of threads for a thread pool, you use
61  * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
62  *
63  * Finally you can control the number of unused threads, that are kept
64  * alive by GLib for future use. The current number can be fetched with
65  * g_thread_pool_get_num_unused_threads(). The maximal number can be
66  * controlled by g_thread_pool_get_max_unused_threads() and
67  * g_thread_pool_set_max_unused_threads(). All currently unused threads
68  * can be stopped by calling g_thread_pool_stop_unused_threads().
69  **/
70
71 #define DEBUG_MSG(x)  
72 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
73
74 typedef struct _GRealThreadPool GRealThreadPool;
75
76 /**
77  * GThreadPool:
78  * @func: the function to execute in the threads of this pool
79  * @user_data: the user data for the threads of this pool
80  * @exclusive: are all threads exclusive to this pool
81  *
82  * The #GThreadPool struct represents a thread pool. It has three
83  * public read-only members, but the underlying struct is bigger, so
84  * you must not copy this struct.
85  **/
86 struct _GRealThreadPool
87 {
88   GThreadPool pool;
89   GAsyncQueue* queue;
90   GCond* cond;
91   gint max_threads;
92   gint num_threads;
93   gboolean running;
94   gboolean immediate;
95   gboolean waiting;
96   GCompareDataFunc sort_func;
97   gpointer sort_user_data;
98 };
99
100 /* The following is just an address to mark the wakeup order for a
101  * thread, it could be any address (as long, as it isn't a valid
102  * GThreadPool address) */
103 static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
104 static gint wakeup_thread_serial = 0;
105
106 /* Here all unused threads are waiting  */
107 static GAsyncQueue *unused_thread_queue = NULL;
108 static gint unused_threads = 0;
109 static gint max_unused_threads = 0;
110 static gint kill_unused_threads = 0;
111 static guint max_idle_time = 0;
112
113 static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
114                                                            gpointer          data);
115 static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
116 static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
117 static void             g_thread_pool_start_thread        (GRealThreadPool  *pool,
118                                                            GError          **error);
119 static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
120 static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
121 static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
122
123 static void
124 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
125                                    gpointer         data)
126 {
127   if (pool->sort_func) 
128     g_async_queue_push_sorted_unlocked (pool->queue, 
129                                         data,
130                                         pool->sort_func, 
131                                         pool->sort_user_data);
132   else
133     g_async_queue_push_unlocked (pool->queue, data);
134 }
135
136 static GRealThreadPool*
137 g_thread_pool_wait_for_new_pool (void)
138 {
139   GRealThreadPool *pool;
140   gint local_wakeup_thread_serial;
141   guint local_max_unused_threads;
142   gint local_max_idle_time;
143   gint last_wakeup_thread_serial;
144   gboolean have_relayed_thread_marker = FALSE;
145
146   local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
147   local_max_idle_time = g_atomic_int_get (&max_idle_time);
148   last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
149
150   g_atomic_int_inc (&unused_threads);
151
152   do
153     {
154       if (g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
155         {
156           /* If this is a superfluous thread, stop it. */
157           pool = NULL;
158         }
159       else if (local_max_idle_time > 0)
160         {
161           /* If a maximal idle time is given, wait for the given time. */
162           GTimeVal end_time;
163
164           g_get_current_time (&end_time);
165           g_time_val_add (&end_time, local_max_idle_time * 1000);
166
167           DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
168                       g_thread_self (), local_max_idle_time / 1000.0));
169
170           pool = g_async_queue_timed_pop (unused_thread_queue, &end_time);
171         }
172       else
173         {
174           /* If no maximal idle time is given, wait indefinitely. */
175           DEBUG_MSG (("thread %p waiting in global pool.",
176                       g_thread_self ()));
177           pool = g_async_queue_pop (unused_thread_queue);
178         }
179
180       if (pool == wakeup_thread_marker)
181         {
182           local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
183           if (last_wakeup_thread_serial == local_wakeup_thread_serial)
184             {
185               if (!have_relayed_thread_marker)
186               {
187                 /* If this wakeup marker has been received for
188                  * the second time, relay it. 
189                  */
190                 DEBUG_MSG (("thread %p relaying wakeup message to "
191                             "waiting thread with lower serial.",
192                             g_thread_self ()));
193
194                 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
195                 have_relayed_thread_marker = TRUE;
196
197                 /* If a wakeup marker has been relayed, this thread
198                  * will get out of the way for 100 microseconds to
199                  * avoid receiving this marker again. */
200                 g_usleep (100);
201               }
202             }
203           else
204             {
205               if (g_atomic_int_exchange_and_add (&kill_unused_threads, -1) > 0)
206                 {
207                   pool = NULL;
208                   break;
209                 }
210
211               DEBUG_MSG (("thread %p updating to new limits.",
212                           g_thread_self ()));
213
214               local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
215               local_max_idle_time = g_atomic_int_get (&max_idle_time);
216               last_wakeup_thread_serial = local_wakeup_thread_serial;
217
218               have_relayed_thread_marker = FALSE;
219             }
220         }
221     }
222   while (pool == wakeup_thread_marker);
223
224   g_atomic_int_add (&unused_threads, -1);
225
226   return pool;
227 }
228
229 static gpointer
230 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
231 {
232   gpointer task = NULL;
233
234   if (pool->running || (!pool->immediate &&
235                         g_async_queue_length_unlocked (pool->queue) > 0))
236     {
237       /* This thread pool is still active. */
238       if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
239         {
240           /* This is a superfluous thread, so it goes to the global pool. */
241           DEBUG_MSG (("superfluous thread %p in pool %p.",
242                       g_thread_self (), pool));
243         }
244       else if (pool->pool.exclusive)
245         {
246           /* Exclusive threads stay attached to the pool. */
247           task = g_async_queue_pop_unlocked (pool->queue);
248
249           DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
250                       "(%d running, %d unprocessed).",
251                       g_thread_self (), pool, pool->num_threads,
252                       g_async_queue_length_unlocked (pool->queue)));
253         }
254       else
255         {
256           /* A thread will wait for new tasks for at most 1/2
257            * second before going to the global pool.
258            */
259           GTimeVal end_time;
260
261           g_get_current_time (&end_time);
262           g_time_val_add (&end_time, G_USEC_PER_SEC / 2);       /* 1/2 second */
263
264           DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
265                       "(%d running, %d unprocessed).",
266                       g_thread_self (), pool, pool->num_threads,
267                       g_async_queue_length_unlocked (pool->queue)));
268
269           task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
270         }
271     }
272   else
273     {
274       /* This thread pool is inactive, it will no longer process tasks. */
275       DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
276                   "(running: %s, immediate: %s, len: %d).",
277                   pool, g_thread_self (),
278                   pool->running ? "true" : "false",
279                   pool->immediate ? "true" : "false",
280                   g_async_queue_length_unlocked (pool->queue)));
281     }
282
283   return task;
284 }
285
286
287 static gpointer 
288 g_thread_pool_thread_proxy (gpointer data)
289 {
290   GRealThreadPool *pool;
291
292   pool = data;
293
294   DEBUG_MSG (("thread %p started for pool %p.", 
295               g_thread_self (), pool));
296
297   g_async_queue_lock (pool->queue);
298
299   while (TRUE)
300     {
301       gpointer task;
302
303       task = g_thread_pool_wait_for_new_task (pool);
304       if (task)
305         {
306           if (pool->running || !pool->immediate)
307             {
308               /* A task was received and the thread pool is active, so
309                * execute the function. 
310                */
311               g_async_queue_unlock (pool->queue);
312               DEBUG_MSG (("thread %p in pool %p calling func.", 
313                           g_thread_self (), pool));
314               pool->pool.func (task, pool->pool.user_data);
315               g_async_queue_lock (pool->queue);
316             }
317         }
318       else
319         {
320           /* No task was received, so this thread goes to the global
321            * pool. 
322            */
323           gboolean free_pool = FALSE;
324  
325           DEBUG_MSG (("thread %p leaving pool %p for global pool.", 
326                       g_thread_self (), pool));
327           pool->num_threads--;
328
329           if (!pool->running)
330             {
331               if (!pool->waiting)
332                 {
333                   if (pool->num_threads == 0)
334                     {
335                       /* If the pool is not running and no other
336                        * thread is waiting for this thread pool to
337                        * finish and this is the last thread of this
338                        * pool, free the pool.
339                        */
340                       free_pool = TRUE;
341                     }           
342                   else 
343                     {
344                       /* If the pool is not running and no other
345                        * thread is waiting for this thread pool to
346                        * finish and this is not the last thread of
347                        * this pool and there are no tasks left in the
348                        * queue, wakeup the remaining threads. 
349                        */
350                       if (g_async_queue_length_unlocked (pool->queue) == 
351                           - pool->num_threads)
352                         g_thread_pool_wakeup_and_stop_all (pool);
353                     }
354                 }
355               else if (pool->immediate || 
356                        g_async_queue_length_unlocked (pool->queue) <= 0)
357                 {
358                   /* If the pool is not running and another thread is
359                    * waiting for this thread pool to finish and there
360                    * are either no tasks left or the pool shall stop
361                    * immediatly, inform the waiting thread of a change
362                    * of the thread pool state. 
363                    */
364                   g_cond_broadcast (pool->cond);
365                 }
366             }
367
368           g_async_queue_unlock (pool->queue);
369
370           if (free_pool)
371             g_thread_pool_free_internal (pool);
372
373           if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL) 
374             break;
375
376           g_async_queue_lock (pool->queue);
377           
378           DEBUG_MSG (("thread %p entering pool %p from global pool.", 
379                       g_thread_self (), pool));
380
381           /* pool->num_threads++ is not done here, but in
382            * g_thread_pool_start_thread to make the new started thread
383            * known to the pool, before itself can do it. 
384            */
385         }
386     }
387
388   return NULL;
389 }
390
391 static void
392 g_thread_pool_start_thread (GRealThreadPool  *pool, 
393                             GError          **error)
394 {
395   gboolean success = FALSE;
396   
397   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
398     /* Enough threads are already running */
399     return;
400
401   g_async_queue_lock (unused_thread_queue);
402
403   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
404     {
405       g_async_queue_push_unlocked (unused_thread_queue, pool);
406       success = TRUE;
407     }
408
409   g_async_queue_unlock (unused_thread_queue);
410
411   if (!success)
412     {
413       GError *local_error = NULL;
414       /* No thread was found, we have to start a new one */
415       g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
416       
417       if (local_error)
418         {
419           g_propagate_error (error, local_error);
420           return;
421         }
422     }
423
424   /* See comment in g_thread_pool_thread_proxy as to why this is done
425    * here and not there
426    */
427   pool->num_threads++;
428 }
429
430 /**
431  * g_thread_pool_new: 
432  * @func: a function to execute in the threads of the new thread pool
433  * @user_data: user data that is handed over to @func every time it 
434  *   is called
435  * @max_threads: the maximal number of threads to execute concurrently in 
436  *   the new thread pool, -1 means no limit
437  * @exclusive: should this thread pool be exclusive?
438  * @error: return location for error
439  *
440  * This function creates a new thread pool.
441  *
442  * Whenever you call g_thread_pool_push(), either a new thread is
443  * created or an unused one is reused. At most @max_threads threads
444  * are running concurrently for this thread pool. @max_threads = -1
445  * allows unlimited threads to be created for this thread pool. The
446  * newly created or reused thread now executes the function @func with
447  * the two arguments. The first one is the parameter to
448  * g_thread_pool_push() and the second one is @user_data.
449  *
450  * The parameter @exclusive determines, whether the thread pool owns
451  * all threads exclusive or whether the threads are shared
452  * globally. If @exclusive is %TRUE, @max_threads threads are started
453  * immediately and they will run exclusively for this thread pool until
454  * it is destroyed by g_thread_pool_free(). If @exclusive is %FALSE,
455  * threads are created, when needed and shared between all
456  * non-exclusive thread pools. This implies that @max_threads may not
457  * be -1 for exclusive thread pools.
458  *
459  * @error can be %NULL to ignore errors, or non-%NULL to report
460  * errors. An error can only occur when @exclusive is set to %TRUE and
461  * not all @max_threads threads could be created.
462  *
463  * Return value: the new #GThreadPool
464  **/
465 GThreadPool* 
466 g_thread_pool_new (GFunc            func,
467                    gpointer         user_data,
468                    gint             max_threads,
469                    gboolean         exclusive,
470                    GError         **error)
471 {
472   GRealThreadPool *retval;
473   G_LOCK_DEFINE_STATIC (init);
474
475   g_return_val_if_fail (func, NULL);
476   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
477   g_return_val_if_fail (max_threads >= -1, NULL);
478   g_return_val_if_fail (g_thread_supported (), NULL);
479
480   retval = g_new (GRealThreadPool, 1);
481
482   retval->pool.func = func;
483   retval->pool.user_data = user_data;
484   retval->pool.exclusive = exclusive;
485   retval->queue = g_async_queue_new ();
486   retval->cond = NULL;
487   retval->max_threads = max_threads;
488   retval->num_threads = 0;
489   retval->running = TRUE;
490   retval->sort_func = NULL;
491   retval->sort_user_data = NULL;
492
493   G_LOCK (init);
494   if (!unused_thread_queue)
495       unused_thread_queue = g_async_queue_new ();
496   G_UNLOCK (init);
497
498   if (retval->pool.exclusive)
499     {
500       g_async_queue_lock (retval->queue);
501   
502       while (retval->num_threads < retval->max_threads)
503         {
504           GError *local_error = NULL;
505           g_thread_pool_start_thread (retval, &local_error);
506           if (local_error)
507             {
508               g_propagate_error (error, local_error);
509               break;
510             }
511         }
512
513       g_async_queue_unlock (retval->queue);
514     }
515
516   return (GThreadPool*) retval;
517 }
518
519 /**
520  * g_thread_pool_push:
521  * @pool: a #GThreadPool
522  * @data: a new task for @pool
523  * @error: return location for error
524  * 
525  * Inserts @data into the list of tasks to be executed by @pool. When
526  * the number of currently running threads is lower than the maximal
527  * allowed number of threads, a new thread is started (or reused) with
528  * the properties given to g_thread_pool_new (). Otherwise @data stays
529  * in the queue until a thread in this pool finishes its previous task
530  * and processes @data. 
531  *
532  * @error can be %NULL to ignore errors, or non-%NULL to report
533  * errors. An error can only occur when a new thread couldn't be
534  * created. In that case @data is simply appended to the queue of work
535  * to do.  
536  **/
537 void 
538 g_thread_pool_push (GThreadPool  *pool,
539                     gpointer      data,
540                     GError      **error)
541 {
542   GRealThreadPool *real;
543
544   real = (GRealThreadPool*) pool;
545
546   g_return_if_fail (real);
547   g_return_if_fail (real->running);
548
549   g_async_queue_lock (real->queue);
550
551   if (g_async_queue_length_unlocked (real->queue) >= 0)
552     /* No thread is waiting in the queue */
553     g_thread_pool_start_thread (real, error);
554
555   g_thread_pool_queue_push_unlocked (real, data);
556   g_async_queue_unlock (real->queue);
557 }
558
559 /**
560  * g_thread_pool_set_max_threads:
561  * @pool: a #GThreadPool
562  * @max_threads: a new maximal number of threads for @pool
563  * @error: return location for error
564  * 
565  * Sets the maximal allowed number of threads for @pool. A value of -1
566  * means, that the maximal number of threads is unlimited.
567  *
568  * Setting @max_threads to 0 means stopping all work for @pool. It is
569  * effectively frozen until @max_threads is set to a non-zero value
570  * again.
571  * 
572  * A thread is never terminated while calling @func, as supplied by
573  * g_thread_pool_new (). Instead the maximal number of threads only
574  * has effect for the allocation of new threads in g_thread_pool_push(). 
575  * A new thread is allocated, whenever the number of currently
576  * running threads in @pool is smaller than the maximal number.
577  *
578  * @error can be %NULL to ignore errors, or non-%NULL to report
579  * errors. An error can only occur when a new thread couldn't be
580  * created. 
581  **/
582 void
583 g_thread_pool_set_max_threads (GThreadPool  *pool,
584                                gint          max_threads,
585                                GError      **error)
586 {
587   GRealThreadPool *real;
588   gint to_start;
589
590   real = (GRealThreadPool*) pool;
591
592   g_return_if_fail (real);
593   g_return_if_fail (real->running);
594   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
595   g_return_if_fail (max_threads >= -1);
596
597   g_async_queue_lock (real->queue);
598
599   real->max_threads = max_threads;
600   
601   if (pool->exclusive)
602     to_start = real->max_threads - real->num_threads;
603   else
604     to_start = g_async_queue_length_unlocked (real->queue);
605   
606   for ( ; to_start > 0; to_start--)
607     {
608       GError *local_error = NULL;
609
610       g_thread_pool_start_thread (real, &local_error);
611       if (local_error)
612         {
613           g_propagate_error (error, local_error);
614           break;
615         }
616     }
617    
618   g_async_queue_unlock (real->queue);
619 }
620
621 /**
622  * g_thread_pool_get_max_threads:
623  * @pool: a #GThreadPool
624  *
625  * Returns the maximal number of threads for @pool.
626  *
627  * Return value: the maximal number of threads
628  **/
629 gint
630 g_thread_pool_get_max_threads (GThreadPool *pool)
631 {
632   GRealThreadPool *real;
633   gint retval;
634
635   real = (GRealThreadPool*) pool;
636
637   g_return_val_if_fail (real, 0);
638   g_return_val_if_fail (real->running, 0);
639
640   g_async_queue_lock (real->queue);
641   retval = real->max_threads;
642   g_async_queue_unlock (real->queue);
643
644   return retval;
645 }
646
647 /**
648  * g_thread_pool_get_num_threads:
649  * @pool: a #GThreadPool
650  *
651  * Returns the number of threads currently running in @pool.
652  *
653  * Return value: the number of threads currently running
654  **/
655 guint
656 g_thread_pool_get_num_threads (GThreadPool *pool)
657 {
658   GRealThreadPool *real;
659   guint retval;
660
661   real = (GRealThreadPool*) pool;
662
663   g_return_val_if_fail (real, 0);
664   g_return_val_if_fail (real->running, 0);
665
666   g_async_queue_lock (real->queue);
667   retval = real->num_threads;
668   g_async_queue_unlock (real->queue);
669
670   return retval;
671 }
672
673 /**
674  * g_thread_pool_unprocessed:
675  * @pool: a #GThreadPool
676  *
677  * Returns the number of tasks still unprocessed in @pool.
678  *
679  * Return value: the number of unprocessed tasks
680  **/
681 guint
682 g_thread_pool_unprocessed (GThreadPool *pool)
683 {
684   GRealThreadPool *real;
685   gint unprocessed;
686
687   real = (GRealThreadPool*) pool;
688
689   g_return_val_if_fail (real, 0);
690   g_return_val_if_fail (real->running, 0);
691
692   unprocessed = g_async_queue_length (real->queue);
693
694   return MAX (unprocessed, 0);
695 }
696
697 /**
698  * g_thread_pool_free:
699  * @pool: a #GThreadPool
700  * @immediate: should @pool shut down immediately?
701  * @wait_: should the function wait for all tasks to be finished?
702  *
703  * Frees all resources allocated for @pool.
704  *
705  * If @immediate is %TRUE, no new task is processed for
706  * @pool. Otherwise @pool is not freed before the last task is
707  * processed. Note however, that no thread of this pool is
708  * interrupted, while processing a task. Instead at least all still
709  * running threads can finish their tasks before the @pool is freed.
710  *
711  * If @wait_ is %TRUE, the functions does not return before all tasks
712  * to be processed (dependent on @immediate, whether all or only the
713  * currently running) are ready. Otherwise the function returns immediately.
714  *
715  * After calling this function @pool must not be used anymore. 
716  **/
717 void
718 g_thread_pool_free (GThreadPool *pool,
719                     gboolean     immediate,
720                     gboolean     wait_)
721 {
722   GRealThreadPool *real;
723
724   real = (GRealThreadPool*) pool;
725
726   g_return_if_fail (real);
727   g_return_if_fail (real->running);
728
729   /* If there's no thread allowed here, there is not much sense in
730    * not stopping this pool immediately, when it's not empty 
731    */
732   g_return_if_fail (immediate || 
733                     real->max_threads != 0 || 
734                     g_async_queue_length (real->queue) == 0);
735
736   g_async_queue_lock (real->queue);
737
738   real->running = FALSE;
739   real->immediate = immediate;
740   real->waiting = wait_;
741
742   if (wait_)
743     {
744       real->cond = g_cond_new ();
745
746       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads &&
747              !(immediate && real->num_threads == 0))
748         g_cond_wait (real->cond, _g_async_queue_get_mutex (real->queue));
749     }
750
751   if (immediate || g_async_queue_length_unlocked (real->queue) == -real->num_threads)
752     {
753       /* No thread is currently doing something (and nothing is left
754        * to process in the queue) 
755        */
756       if (real->num_threads == 0) 
757         {
758           /* No threads left, we clean up */
759           g_async_queue_unlock (real->queue);
760           g_thread_pool_free_internal (real);
761           return;
762         }
763
764       g_thread_pool_wakeup_and_stop_all (real);
765     }
766   
767   /* The last thread should cleanup the pool */
768   real->waiting = FALSE; 
769   g_async_queue_unlock (real->queue);
770 }
771
772 static void
773 g_thread_pool_free_internal (GRealThreadPool* pool)
774 {
775   g_return_if_fail (pool);
776   g_return_if_fail (pool->running == FALSE);
777   g_return_if_fail (pool->num_threads == 0);
778
779   g_async_queue_unref (pool->queue);
780
781   if (pool->cond)
782     g_cond_free (pool->cond);
783
784   g_free (pool);
785 }
786
787 static void
788 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
789 {
790   guint i;
791   
792   g_return_if_fail (pool);
793   g_return_if_fail (pool->running == FALSE);
794   g_return_if_fail (pool->num_threads != 0);
795
796   pool->immediate = TRUE; 
797
798   for (i = 0; i < pool->num_threads; i++)
799     g_thread_pool_queue_push_unlocked (pool, GUINT_TO_POINTER (1));
800 }
801
802 /**
803  * g_thread_pool_set_max_unused_threads:
804  * @max_threads: maximal number of unused threads
805  *
806  * Sets the maximal number of unused threads to @max_threads. If
807  * @max_threads is -1, no limit is imposed on the number of unused
808  * threads.
809  **/
810 void
811 g_thread_pool_set_max_unused_threads (gint max_threads)
812 {
813   g_return_if_fail (max_threads >= -1);  
814
815   g_atomic_int_set (&max_unused_threads, max_threads);
816
817   if (max_threads != -1)
818     {
819       max_threads -= g_atomic_int_get (&unused_threads);
820       if (max_threads < 0)
821         {
822           g_atomic_int_set (&kill_unused_threads, -max_threads);
823           g_atomic_int_inc (&wakeup_thread_serial);
824
825           g_async_queue_lock (unused_thread_queue);
826
827           do
828             {
829               g_async_queue_push_unlocked (unused_thread_queue,
830                                            wakeup_thread_marker);
831             }
832           while (++max_threads);
833
834           g_async_queue_unlock (unused_thread_queue);
835         }
836     }
837 }
838
839 /**
840  * g_thread_pool_get_max_unused_threads:
841  * 
842  * Returns the maximal allowed number of unused threads.
843  *
844  * Return value: the maximal number of unused threads
845  **/
846 gint
847 g_thread_pool_get_max_unused_threads (void)
848 {
849   return g_atomic_int_get (&max_unused_threads);
850 }
851
852 /**
853  * g_thread_pool_get_num_unused_threads:
854  * 
855  * Returns the number of currently unused threads.
856  *
857  * Return value: the number of currently unused threads
858  **/
859 guint 
860 g_thread_pool_get_num_unused_threads (void)
861 {
862   return g_atomic_int_get (&unused_threads);
863 }
864
865 /**
866  * g_thread_pool_stop_unused_threads:
867  * 
868  * Stops all currently unused threads. This does not change the
869  * maximal number of unused threads. This function can be used to
870  * regularly stop all unused threads e.g. from g_timeout_add().
871  **/
872 void
873 g_thread_pool_stop_unused_threads (void)
874
875   guint oldval;
876
877   oldval = g_thread_pool_get_max_unused_threads ();
878
879   g_thread_pool_set_max_unused_threads (0);
880   g_thread_pool_set_max_unused_threads (oldval);
881 }
882
883 /**
884  * g_thread_pool_set_sort_function:
885  * @pool: a #GThreadPool
886  * @func: the #GCompareDataFunc used to sort the list of tasks. 
887  *     This function is passed two tasks. It should return
888  *     0 if the order in which they are handled does not matter, 
889  *     a negative value if the first task should be processed before
890  *     the second or a positive value if the second task should be 
891  *     processed first.
892  * @user_data: user data passed to @func.
893  *
894  * Sets the function used to sort the list of tasks. This allows the
895  * tasks to be processed by a priority determined by @func, and not
896  * just in the order in which they were added to the pool.
897  *
898  * Note, if the maximum number of threads is more than 1, the order
899  * that threads are executed can not be guranteed 100%. Threads are
900  * scheduled by the operating system and are executed at random. It
901  * cannot be assumed that threads are executed in the order they are
902  * created. 
903  *
904  * Since: 2.10
905  **/
906 void 
907 g_thread_pool_set_sort_function (GThreadPool      *pool,
908                                  GCompareDataFunc  func,
909                                  gpointer          user_data)
910
911   GRealThreadPool *real;
912
913   real = (GRealThreadPool*) pool;
914
915   g_return_if_fail (real);
916   g_return_if_fail (real->running);
917
918   g_async_queue_lock (real->queue);
919
920   real->sort_func = func;
921   real->sort_user_data = user_data;
922   
923   if (func) 
924     g_async_queue_sort_unlocked (real->queue, 
925                                  real->sort_func,
926                                  real->sort_user_data);
927
928   g_async_queue_unlock (real->queue);
929 }
930
931 /**
932  * g_thread_pool_set_max_idle_time:
933  * @interval: the maximum @interval (1/1000ths of a second) a thread
934  *     can be idle. 
935  *
936  * This function will set the maximum @interval that a thread waiting
937  * in the pool for new tasks can be idle for before being
938  * stopped. This function is similar to calling
939  * g_thread_pool_stop_unused_threads() on a regular timeout, except,
940  * this is done on a per thread basis.    
941  *
942  * By setting @interval to 0, idle threads will not be stopped.
943  *  
944  * This function makes use of g_async_queue_timed_pop () using
945  * @interval.
946  *
947  * Since: 2.10
948  **/
949 void
950 g_thread_pool_set_max_idle_time (guint interval)
951
952   guint i;
953
954   g_atomic_int_set (&max_idle_time, interval);
955
956   i = g_atomic_int_get (&unused_threads);
957   if (i > 0)
958     {
959       g_atomic_int_inc (&wakeup_thread_serial);
960       g_async_queue_lock (unused_thread_queue);
961
962       do
963         {
964           g_async_queue_push_unlocked (unused_thread_queue,
965                                        wakeup_thread_marker);
966         }
967       while (--i);
968
969       g_async_queue_unlock (unused_thread_queue);
970     }
971 }
972
973 /**
974  * g_thread_pool_get_max_idle_time:
975  * 
976  * This function will return the maximum @interval that a thread will
977  * wait in the thread pool for new tasks before being stopped.
978  *
979  * If this function returns 0, threads waiting in the thread pool for
980  * new work are not stopped.
981  *
982  * Return value: the maximum @interval to wait for new tasks in the
983  *     thread pool before stopping the thread (1/1000ths of a second).
984  *  
985  * Since: 2.10
986  **/
987 guint
988 g_thread_pool_get_max_idle_time (void)
989
990   return g_atomic_int_get (&max_idle_time);
991 }