[HQ](Debian) Package version up
[external/glib2.0.git] / glib / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "glib.h"
30 #include "galias.h"
31
32 /**
33  * SECTION: thread_pools
34  * @title: Thread Pools
35  * @short_description: pools of threads to execute work concurrently
36  * @see_also: <para> <variablelist> <varlistentry>
37  *            <term>#GThread</term> <listitem><para>GLib thread
38  *            system.</para></listitem> </varlistentry> </variablelist>
39  *            </para>
40  *
41  * Sometimes you wish to asynchronously fork out the execution of work
42  * and continue working in your own thread. If that will happen often,
43  * the overhead of starting and destroying a thread each time might be
44  * too high. In such cases reusing already started threads seems like a
45  * good idea. And it indeed is, but implementing this can be tedious
46  * and error-prone.
47  *
48  * Therefore GLib provides thread pools for your convenience. An added
49  * advantage is, that the threads can be shared between the different
50  * subsystems of your program, when they are using GLib.
51  *
52  * To create a new thread pool, you use g_thread_pool_new(). It is
53  * destroyed by g_thread_pool_free().
54  *
55  * If you want to execute a certain task within a thread pool, you call
56  * g_thread_pool_push().
57  *
58  * To get the current number of running threads you call
59  * g_thread_pool_get_num_threads(). To get the number of still
60  * unprocessed tasks you call g_thread_pool_unprocessed(). To control
61  * the maximal number of threads for a thread pool, you use
62  * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
63  *
64  * Finally you can control the number of unused threads, that are kept
65  * alive by GLib for future use. The current number can be fetched with
66  * g_thread_pool_get_num_unused_threads(). The maximal number can be
67  * controlled by g_thread_pool_get_max_unused_threads() and
68  * g_thread_pool_set_max_unused_threads(). All currently unused threads
69  * can be stopped by calling g_thread_pool_stop_unused_threads().
70  **/
71
72 #define DEBUG_MSG(x)  
73 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
74
75 typedef struct _GRealThreadPool GRealThreadPool;
76
77 /**
78  * GThreadPool:
79  * @func: the function to execute in the threads of this pool
80  * @user_data: the user data for the threads of this pool
81  * @exclusive: are all threads exclusive to this pool
82  *
83  * The #GThreadPool struct represents a thread pool. It has three
84  * public read-only members, but the underlying struct is bigger, so
85  * you must not copy this struct.
86  **/
87 struct _GRealThreadPool
88 {
89   GThreadPool pool;
90   GAsyncQueue* queue;
91   GCond* cond;
92   gint max_threads;
93   gint num_threads;
94   gboolean running;
95   gboolean immediate;
96   gboolean waiting;
97   GCompareDataFunc sort_func;
98   gpointer sort_user_data;
99 };
100
101 /* The following is just an address to mark the wakeup order for a
102  * thread, it could be any address (as long, as it isn't a valid
103  * GThreadPool address) */
104 static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
105 static gint wakeup_thread_serial = 0;
106
107 /* Here all unused threads are waiting  */
108 static GAsyncQueue *unused_thread_queue = NULL;
109 static gint unused_threads = 0;
110 static gint max_unused_threads = 0;
111 static gint kill_unused_threads = 0;
112 static guint max_idle_time = 0;
113
114 static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
115                                                            gpointer          data);
116 static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
117 static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
118 static void             g_thread_pool_start_thread        (GRealThreadPool  *pool,
119                                                            GError          **error);
120 static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
121 static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
122 static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
123
124 static void
125 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
126                                    gpointer         data)
127 {
128   if (pool->sort_func) 
129     g_async_queue_push_sorted_unlocked (pool->queue, 
130                                         data,
131                                         pool->sort_func, 
132                                         pool->sort_user_data);
133   else
134     g_async_queue_push_unlocked (pool->queue, data);
135 }
136
137 static GRealThreadPool*
138 g_thread_pool_wait_for_new_pool (void)
139 {
140   GRealThreadPool *pool;
141   gint local_wakeup_thread_serial;
142   guint local_max_unused_threads;
143   gint local_max_idle_time;
144   gint last_wakeup_thread_serial;
145   gboolean have_relayed_thread_marker = FALSE;
146
147   local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
148   local_max_idle_time = g_atomic_int_get (&max_idle_time);
149   last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
150
151   g_atomic_int_inc (&unused_threads);
152
153   do
154     {
155       if (g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
156         {
157           /* If this is a superfluous thread, stop it. */
158           pool = NULL;
159         }
160       else if (local_max_idle_time > 0)
161         {
162           /* If a maximal idle time is given, wait for the given time. */
163           GTimeVal end_time;
164
165           g_get_current_time (&end_time);
166           g_time_val_add (&end_time, local_max_idle_time * 1000);
167
168           DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
169                       g_thread_self (), local_max_idle_time / 1000.0));
170
171           pool = g_async_queue_timed_pop (unused_thread_queue, &end_time);
172         }
173       else
174         {
175           /* If no maximal idle time is given, wait indefinitely. */
176           DEBUG_MSG (("thread %p waiting in global pool.",
177                       g_thread_self ()));
178           pool = g_async_queue_pop (unused_thread_queue);
179         }
180
181       if (pool == wakeup_thread_marker)
182         {
183           local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
184           if (last_wakeup_thread_serial == local_wakeup_thread_serial)
185             {
186               if (!have_relayed_thread_marker)
187               {
188                 /* If this wakeup marker has been received for
189                  * the second time, relay it. 
190                  */
191                 DEBUG_MSG (("thread %p relaying wakeup message to "
192                             "waiting thread with lower serial.",
193                             g_thread_self ()));
194
195                 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
196                 have_relayed_thread_marker = TRUE;
197
198                 /* If a wakeup marker has been relayed, this thread
199                  * will get out of the way for 100 microseconds to
200                  * avoid receiving this marker again. */
201                 g_usleep (100);
202               }
203             }
204           else
205             {
206               if (g_atomic_int_exchange_and_add (&kill_unused_threads, -1) > 0)
207                 {
208                   pool = NULL;
209                   break;
210                 }
211
212               DEBUG_MSG (("thread %p updating to new limits.",
213                           g_thread_self ()));
214
215               local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
216               local_max_idle_time = g_atomic_int_get (&max_idle_time);
217               last_wakeup_thread_serial = local_wakeup_thread_serial;
218
219               have_relayed_thread_marker = FALSE;
220             }
221         }
222     }
223   while (pool == wakeup_thread_marker);
224
225   g_atomic_int_add (&unused_threads, -1);
226
227   return pool;
228 }
229
230 static gpointer
231 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
232 {
233   gpointer task = NULL;
234
235   if (pool->running || (!pool->immediate &&
236                         g_async_queue_length_unlocked (pool->queue) > 0))
237     {
238       /* This thread pool is still active. */
239       if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
240         {
241           /* This is a superfluous thread, so it goes to the global pool. */
242           DEBUG_MSG (("superfluous thread %p in pool %p.",
243                       g_thread_self (), pool));
244         }
245       else if (pool->pool.exclusive)
246         {
247           /* Exclusive threads stay attached to the pool. */
248           task = g_async_queue_pop_unlocked (pool->queue);
249
250           DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
251                       "(%d running, %d unprocessed).",
252                       g_thread_self (), pool, pool->num_threads,
253                       g_async_queue_length_unlocked (pool->queue)));
254         }
255       else
256         {
257           /* A thread will wait for new tasks for at most 1/2
258            * second before going to the global pool.
259            */
260           GTimeVal end_time;
261
262           g_get_current_time (&end_time);
263           g_time_val_add (&end_time, G_USEC_PER_SEC / 2);       /* 1/2 second */
264
265           DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
266                       "(%d running, %d unprocessed).",
267                       g_thread_self (), pool, pool->num_threads,
268                       g_async_queue_length_unlocked (pool->queue)));
269
270           task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
271         }
272     }
273   else
274     {
275       /* This thread pool is inactive, it will no longer process tasks. */
276       DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
277                   "(running: %s, immediate: %s, len: %d).",
278                   pool, g_thread_self (),
279                   pool->running ? "true" : "false",
280                   pool->immediate ? "true" : "false",
281                   g_async_queue_length_unlocked (pool->queue)));
282     }
283
284   return task;
285 }
286
287
288 static gpointer 
289 g_thread_pool_thread_proxy (gpointer data)
290 {
291   GRealThreadPool *pool;
292
293   pool = data;
294
295   DEBUG_MSG (("thread %p started for pool %p.", 
296               g_thread_self (), pool));
297
298   g_async_queue_lock (pool->queue);
299
300   while (TRUE)
301     {
302       gpointer task;
303
304       task = g_thread_pool_wait_for_new_task (pool);
305       if (task)
306         {
307           if (pool->running || !pool->immediate)
308             {
309               /* A task was received and the thread pool is active, so
310                * execute the function. 
311                */
312               g_async_queue_unlock (pool->queue);
313               DEBUG_MSG (("thread %p in pool %p calling func.", 
314                           g_thread_self (), pool));
315               pool->pool.func (task, pool->pool.user_data);
316               g_async_queue_lock (pool->queue);
317             }
318         }
319       else
320         {
321           /* No task was received, so this thread goes to the global
322            * pool. 
323            */
324           gboolean free_pool = FALSE;
325  
326           DEBUG_MSG (("thread %p leaving pool %p for global pool.", 
327                       g_thread_self (), pool));
328           pool->num_threads--;
329
330           if (!pool->running)
331             {
332               if (!pool->waiting)
333                 {
334                   if (pool->num_threads == 0)
335                     {
336                       /* If the pool is not running and no other
337                        * thread is waiting for this thread pool to
338                        * finish and this is the last thread of this
339                        * pool, free the pool.
340                        */
341                       free_pool = TRUE;
342                     }           
343                   else 
344                     {
345                       /* If the pool is not running and no other
346                        * thread is waiting for this thread pool to
347                        * finish and this is not the last thread of
348                        * this pool and there are no tasks left in the
349                        * queue, wakeup the remaining threads. 
350                        */
351                       if (g_async_queue_length_unlocked (pool->queue) == 
352                           - pool->num_threads)
353                         g_thread_pool_wakeup_and_stop_all (pool);
354                     }
355                 }
356               else if (pool->immediate || 
357                        g_async_queue_length_unlocked (pool->queue) <= 0)
358                 {
359                   /* If the pool is not running and another thread is
360                    * waiting for this thread pool to finish and there
361                    * are either no tasks left or the pool shall stop
362                    * immediatly, inform the waiting thread of a change
363                    * of the thread pool state. 
364                    */
365                   g_cond_broadcast (pool->cond);
366                 }
367             }
368
369           g_async_queue_unlock (pool->queue);
370
371           if (free_pool)
372             g_thread_pool_free_internal (pool);
373
374           if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL) 
375             break;
376
377           g_async_queue_lock (pool->queue);
378           
379           DEBUG_MSG (("thread %p entering pool %p from global pool.", 
380                       g_thread_self (), pool));
381
382           /* pool->num_threads++ is not done here, but in
383            * g_thread_pool_start_thread to make the new started thread
384            * known to the pool, before itself can do it. 
385            */
386         }
387     }
388
389   return NULL;
390 }
391
392 static void
393 g_thread_pool_start_thread (GRealThreadPool  *pool, 
394                             GError          **error)
395 {
396   gboolean success = FALSE;
397   
398   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
399     /* Enough threads are already running */
400     return;
401
402   g_async_queue_lock (unused_thread_queue);
403
404   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
405     {
406       g_async_queue_push_unlocked (unused_thread_queue, pool);
407       success = TRUE;
408     }
409
410   g_async_queue_unlock (unused_thread_queue);
411
412   if (!success)
413     {
414       GError *local_error = NULL;
415       /* No thread was found, we have to start a new one */
416       g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
417       
418       if (local_error)
419         {
420           g_propagate_error (error, local_error);
421           return;
422         }
423     }
424
425   /* See comment in g_thread_pool_thread_proxy as to why this is done
426    * here and not there
427    */
428   pool->num_threads++;
429 }
430
431 /**
432  * g_thread_pool_new: 
433  * @func: a function to execute in the threads of the new thread pool
434  * @user_data: user data that is handed over to @func every time it 
435  *   is called
436  * @max_threads: the maximal number of threads to execute concurrently in 
437  *   the new thread pool, -1 means no limit
438  * @exclusive: should this thread pool be exclusive?
439  * @error: return location for error
440  *
441  * This function creates a new thread pool.
442  *
443  * Whenever you call g_thread_pool_push(), either a new thread is
444  * created or an unused one is reused. At most @max_threads threads
445  * are running concurrently for this thread pool. @max_threads = -1
446  * allows unlimited threads to be created for this thread pool. The
447  * newly created or reused thread now executes the function @func with
448  * the two arguments. The first one is the parameter to
449  * g_thread_pool_push() and the second one is @user_data.
450  *
451  * The parameter @exclusive determines, whether the thread pool owns
452  * all threads exclusive or whether the threads are shared
453  * globally. If @exclusive is %TRUE, @max_threads threads are started
454  * immediately and they will run exclusively for this thread pool until
455  * it is destroyed by g_thread_pool_free(). If @exclusive is %FALSE,
456  * threads are created, when needed and shared between all
457  * non-exclusive thread pools. This implies that @max_threads may not
458  * be -1 for exclusive thread pools.
459  *
460  * @error can be %NULL to ignore errors, or non-%NULL to report
461  * errors. An error can only occur when @exclusive is set to %TRUE and
462  * not all @max_threads threads could be created.
463  *
464  * Return value: the new #GThreadPool
465  **/
466 GThreadPool* 
467 g_thread_pool_new (GFunc            func,
468                    gpointer         user_data,
469                    gint             max_threads,
470                    gboolean         exclusive,
471                    GError         **error)
472 {
473   GRealThreadPool *retval;
474   G_LOCK_DEFINE_STATIC (init);
475
476   g_return_val_if_fail (func, NULL);
477   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
478   g_return_val_if_fail (max_threads >= -1, NULL);
479   g_return_val_if_fail (g_thread_supported (), NULL);
480
481   retval = g_new (GRealThreadPool, 1);
482
483   retval->pool.func = func;
484   retval->pool.user_data = user_data;
485   retval->pool.exclusive = exclusive;
486   retval->queue = g_async_queue_new ();
487   retval->cond = NULL;
488   retval->max_threads = max_threads;
489   retval->num_threads = 0;
490   retval->running = TRUE;
491   retval->sort_func = NULL;
492   retval->sort_user_data = NULL;
493
494   G_LOCK (init);
495   if (!unused_thread_queue)
496       unused_thread_queue = g_async_queue_new ();
497   G_UNLOCK (init);
498
499   if (retval->pool.exclusive)
500     {
501       g_async_queue_lock (retval->queue);
502   
503       while (retval->num_threads < retval->max_threads)
504         {
505           GError *local_error = NULL;
506           g_thread_pool_start_thread (retval, &local_error);
507           if (local_error)
508             {
509               g_propagate_error (error, local_error);
510               break;
511             }
512         }
513
514       g_async_queue_unlock (retval->queue);
515     }
516
517   return (GThreadPool*) retval;
518 }
519
520 /**
521  * g_thread_pool_push:
522  * @pool: a #GThreadPool
523  * @data: a new task for @pool
524  * @error: return location for error
525  * 
526  * Inserts @data into the list of tasks to be executed by @pool. When
527  * the number of currently running threads is lower than the maximal
528  * allowed number of threads, a new thread is started (or reused) with
529  * the properties given to g_thread_pool_new (). Otherwise @data stays
530  * in the queue until a thread in this pool finishes its previous task
531  * and processes @data. 
532  *
533  * @error can be %NULL to ignore errors, or non-%NULL to report
534  * errors. An error can only occur when a new thread couldn't be
535  * created. In that case @data is simply appended to the queue of work
536  * to do.  
537  **/
538 void 
539 g_thread_pool_push (GThreadPool  *pool,
540                     gpointer      data,
541                     GError      **error)
542 {
543   GRealThreadPool *real;
544
545   real = (GRealThreadPool*) pool;
546
547   g_return_if_fail (real);
548   g_return_if_fail (real->running);
549
550   g_async_queue_lock (real->queue);
551
552   if (g_async_queue_length_unlocked (real->queue) >= 0)
553     /* No thread is waiting in the queue */
554     g_thread_pool_start_thread (real, error);
555
556   g_thread_pool_queue_push_unlocked (real, data);
557   g_async_queue_unlock (real->queue);
558 }
559
560 /**
561  * g_thread_pool_set_max_threads:
562  * @pool: a #GThreadPool
563  * @max_threads: a new maximal number of threads for @pool
564  * @error: return location for error
565  * 
566  * Sets the maximal allowed number of threads for @pool. A value of -1
567  * means, that the maximal number of threads is unlimited.
568  *
569  * Setting @max_threads to 0 means stopping all work for @pool. It is
570  * effectively frozen until @max_threads is set to a non-zero value
571  * again.
572  * 
573  * A thread is never terminated while calling @func, as supplied by
574  * g_thread_pool_new (). Instead the maximal number of threads only
575  * has effect for the allocation of new threads in g_thread_pool_push(). 
576  * A new thread is allocated, whenever the number of currently
577  * running threads in @pool is smaller than the maximal number.
578  *
579  * @error can be %NULL to ignore errors, or non-%NULL to report
580  * errors. An error can only occur when a new thread couldn't be
581  * created. 
582  **/
583 void
584 g_thread_pool_set_max_threads (GThreadPool  *pool,
585                                gint          max_threads,
586                                GError      **error)
587 {
588   GRealThreadPool *real;
589   gint to_start;
590
591   real = (GRealThreadPool*) pool;
592
593   g_return_if_fail (real);
594   g_return_if_fail (real->running);
595   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
596   g_return_if_fail (max_threads >= -1);
597
598   g_async_queue_lock (real->queue);
599
600   real->max_threads = max_threads;
601   
602   if (pool->exclusive)
603     to_start = real->max_threads - real->num_threads;
604   else
605     to_start = g_async_queue_length_unlocked (real->queue);
606   
607   for ( ; to_start > 0; to_start--)
608     {
609       GError *local_error = NULL;
610
611       g_thread_pool_start_thread (real, &local_error);
612       if (local_error)
613         {
614           g_propagate_error (error, local_error);
615           break;
616         }
617     }
618    
619   g_async_queue_unlock (real->queue);
620 }
621
622 /**
623  * g_thread_pool_get_max_threads:
624  * @pool: a #GThreadPool
625  *
626  * Returns the maximal number of threads for @pool.
627  *
628  * Return value: the maximal number of threads
629  **/
630 gint
631 g_thread_pool_get_max_threads (GThreadPool *pool)
632 {
633   GRealThreadPool *real;
634   gint retval;
635
636   real = (GRealThreadPool*) pool;
637
638   g_return_val_if_fail (real, 0);
639   g_return_val_if_fail (real->running, 0);
640
641   g_async_queue_lock (real->queue);
642   retval = real->max_threads;
643   g_async_queue_unlock (real->queue);
644
645   return retval;
646 }
647
648 /**
649  * g_thread_pool_get_num_threads:
650  * @pool: a #GThreadPool
651  *
652  * Returns the number of threads currently running in @pool.
653  *
654  * Return value: the number of threads currently running
655  **/
656 guint
657 g_thread_pool_get_num_threads (GThreadPool *pool)
658 {
659   GRealThreadPool *real;
660   guint retval;
661
662   real = (GRealThreadPool*) pool;
663
664   g_return_val_if_fail (real, 0);
665   g_return_val_if_fail (real->running, 0);
666
667   g_async_queue_lock (real->queue);
668   retval = real->num_threads;
669   g_async_queue_unlock (real->queue);
670
671   return retval;
672 }
673
674 /**
675  * g_thread_pool_unprocessed:
676  * @pool: a #GThreadPool
677  *
678  * Returns the number of tasks still unprocessed in @pool.
679  *
680  * Return value: the number of unprocessed tasks
681  **/
682 guint
683 g_thread_pool_unprocessed (GThreadPool *pool)
684 {
685   GRealThreadPool *real;
686   gint unprocessed;
687
688   real = (GRealThreadPool*) pool;
689
690   g_return_val_if_fail (real, 0);
691   g_return_val_if_fail (real->running, 0);
692
693   unprocessed = g_async_queue_length (real->queue);
694
695   return MAX (unprocessed, 0);
696 }
697
698 /**
699  * g_thread_pool_free:
700  * @pool: a #GThreadPool
701  * @immediate: should @pool shut down immediately?
702  * @wait_: should the function wait for all tasks to be finished?
703  *
704  * Frees all resources allocated for @pool.
705  *
706  * If @immediate is %TRUE, no new task is processed for
707  * @pool. Otherwise @pool is not freed before the last task is
708  * processed. Note however, that no thread of this pool is
709  * interrupted, while processing a task. Instead at least all still
710  * running threads can finish their tasks before the @pool is freed.
711  *
712  * If @wait_ is %TRUE, the functions does not return before all tasks
713  * to be processed (dependent on @immediate, whether all or only the
714  * currently running) are ready. Otherwise the function returns immediately.
715  *
716  * After calling this function @pool must not be used anymore. 
717  **/
718 void
719 g_thread_pool_free (GThreadPool *pool,
720                     gboolean     immediate,
721                     gboolean     wait_)
722 {
723   GRealThreadPool *real;
724
725   real = (GRealThreadPool*) pool;
726
727   g_return_if_fail (real);
728   g_return_if_fail (real->running);
729
730   /* If there's no thread allowed here, there is not much sense in
731    * not stopping this pool immediately, when it's not empty 
732    */
733   g_return_if_fail (immediate || 
734                     real->max_threads != 0 || 
735                     g_async_queue_length (real->queue) == 0);
736
737   g_async_queue_lock (real->queue);
738
739   real->running = FALSE;
740   real->immediate = immediate;
741   real->waiting = wait_;
742
743   if (wait_)
744     {
745       real->cond = g_cond_new ();
746
747       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads &&
748              !(immediate && real->num_threads == 0))
749         g_cond_wait (real->cond, _g_async_queue_get_mutex (real->queue));
750     }
751
752   if (immediate || g_async_queue_length_unlocked (real->queue) == -real->num_threads)
753     {
754       /* No thread is currently doing something (and nothing is left
755        * to process in the queue) 
756        */
757       if (real->num_threads == 0) 
758         {
759           /* No threads left, we clean up */
760           g_async_queue_unlock (real->queue);
761           g_thread_pool_free_internal (real);
762           return;
763         }
764
765       g_thread_pool_wakeup_and_stop_all (real);
766     }
767   
768   /* The last thread should cleanup the pool */
769   real->waiting = FALSE; 
770   g_async_queue_unlock (real->queue);
771 }
772
773 static void
774 g_thread_pool_free_internal (GRealThreadPool* pool)
775 {
776   g_return_if_fail (pool);
777   g_return_if_fail (pool->running == FALSE);
778   g_return_if_fail (pool->num_threads == 0);
779
780   g_async_queue_unref (pool->queue);
781
782   if (pool->cond)
783     g_cond_free (pool->cond);
784
785   g_free (pool);
786 }
787
788 static void
789 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
790 {
791   guint i;
792   
793   g_return_if_fail (pool);
794   g_return_if_fail (pool->running == FALSE);
795   g_return_if_fail (pool->num_threads != 0);
796
797   pool->immediate = TRUE; 
798
799   for (i = 0; i < pool->num_threads; i++)
800     g_thread_pool_queue_push_unlocked (pool, GUINT_TO_POINTER (1));
801 }
802
803 /**
804  * g_thread_pool_set_max_unused_threads:
805  * @max_threads: maximal number of unused threads
806  *
807  * Sets the maximal number of unused threads to @max_threads. If
808  * @max_threads is -1, no limit is imposed on the number of unused
809  * threads.
810  **/
811 void
812 g_thread_pool_set_max_unused_threads (gint max_threads)
813 {
814   g_return_if_fail (max_threads >= -1);  
815
816   g_atomic_int_set (&max_unused_threads, max_threads);
817
818   if (max_threads != -1)
819     {
820       max_threads -= g_atomic_int_get (&unused_threads);
821       if (max_threads < 0)
822         {
823           g_atomic_int_set (&kill_unused_threads, -max_threads);
824           g_atomic_int_inc (&wakeup_thread_serial);
825
826           g_async_queue_lock (unused_thread_queue);
827
828           do
829             {
830               g_async_queue_push_unlocked (unused_thread_queue,
831                                            wakeup_thread_marker);
832             }
833           while (++max_threads);
834
835           g_async_queue_unlock (unused_thread_queue);
836         }
837     }
838 }
839
840 /**
841  * g_thread_pool_get_max_unused_threads:
842  * 
843  * Returns the maximal allowed number of unused threads.
844  *
845  * Return value: the maximal number of unused threads
846  **/
847 gint
848 g_thread_pool_get_max_unused_threads (void)
849 {
850   return g_atomic_int_get (&max_unused_threads);
851 }
852
853 /**
854  * g_thread_pool_get_num_unused_threads:
855  * 
856  * Returns the number of currently unused threads.
857  *
858  * Return value: the number of currently unused threads
859  **/
860 guint 
861 g_thread_pool_get_num_unused_threads (void)
862 {
863   return g_atomic_int_get (&unused_threads);
864 }
865
866 /**
867  * g_thread_pool_stop_unused_threads:
868  * 
869  * Stops all currently unused threads. This does not change the
870  * maximal number of unused threads. This function can be used to
871  * regularly stop all unused threads e.g. from g_timeout_add().
872  **/
873 void
874 g_thread_pool_stop_unused_threads (void)
875
876   guint oldval;
877
878   oldval = g_thread_pool_get_max_unused_threads ();
879
880   g_thread_pool_set_max_unused_threads (0);
881   g_thread_pool_set_max_unused_threads (oldval);
882 }
883
884 /**
885  * g_thread_pool_set_sort_function:
886  * @pool: a #GThreadPool
887  * @func: the #GCompareDataFunc used to sort the list of tasks. 
888  *     This function is passed two tasks. It should return
889  *     0 if the order in which they are handled does not matter, 
890  *     a negative value if the first task should be processed before
891  *     the second or a positive value if the second task should be 
892  *     processed first.
893  * @user_data: user data passed to @func.
894  *
895  * Sets the function used to sort the list of tasks. This allows the
896  * tasks to be processed by a priority determined by @func, and not
897  * just in the order in which they were added to the pool.
898  *
899  * Note, if the maximum number of threads is more than 1, the order
900  * that threads are executed can not be guranteed 100%. Threads are
901  * scheduled by the operating system and are executed at random. It
902  * cannot be assumed that threads are executed in the order they are
903  * created. 
904  *
905  * Since: 2.10
906  **/
907 void 
908 g_thread_pool_set_sort_function (GThreadPool      *pool,
909                                  GCompareDataFunc  func,
910                                  gpointer          user_data)
911
912   GRealThreadPool *real;
913
914   real = (GRealThreadPool*) pool;
915
916   g_return_if_fail (real);
917   g_return_if_fail (real->running);
918
919   g_async_queue_lock (real->queue);
920
921   real->sort_func = func;
922   real->sort_user_data = user_data;
923   
924   if (func) 
925     g_async_queue_sort_unlocked (real->queue, 
926                                  real->sort_func,
927                                  real->sort_user_data);
928
929   g_async_queue_unlock (real->queue);
930 }
931
932 /**
933  * g_thread_pool_set_max_idle_time:
934  * @interval: the maximum @interval (1/1000ths of a second) a thread
935  *     can be idle. 
936  *
937  * This function will set the maximum @interval that a thread waiting
938  * in the pool for new tasks can be idle for before being
939  * stopped. This function is similar to calling
940  * g_thread_pool_stop_unused_threads() on a regular timeout, except,
941  * this is done on a per thread basis.    
942  *
943  * By setting @interval to 0, idle threads will not be stopped.
944  *  
945  * This function makes use of g_async_queue_timed_pop () using
946  * @interval.
947  *
948  * Since: 2.10
949  **/
950 void
951 g_thread_pool_set_max_idle_time (guint interval)
952
953   guint i;
954
955   g_atomic_int_set (&max_idle_time, interval);
956
957   i = g_atomic_int_get (&unused_threads);
958   if (i > 0)
959     {
960       g_atomic_int_inc (&wakeup_thread_serial);
961       g_async_queue_lock (unused_thread_queue);
962
963       do
964         {
965           g_async_queue_push_unlocked (unused_thread_queue,
966                                        wakeup_thread_marker);
967         }
968       while (--i);
969
970       g_async_queue_unlock (unused_thread_queue);
971     }
972 }
973
974 /**
975  * g_thread_pool_get_max_idle_time:
976  * 
977  * This function will return the maximum @interval that a thread will
978  * wait in the thread pool for new tasks before being stopped.
979  *
980  * If this function returns 0, threads waiting in the thread pool for
981  * new work are not stopped.
982  *
983  * Return value: the maximum @interval to wait for new tasks in the
984  *     thread pool before stopping the thread (1/1000ths of a second).
985  *  
986  * Since: 2.10
987  **/
988 guint
989 g_thread_pool_get_max_idle_time (void)
990
991   return g_atomic_int_get (&max_idle_time);
992 }
993
994 #define __G_THREADPOOL_C__
995 #include "galiasdef.c"