more atomic ops pointer cast fixes. this time it'll work with atomic op
[platform/upstream/glib.git] / glib / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "glib.h"
30 #include "galias.h"
31
32 #define DEBUG_MSG(x)  
33 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
34
35 typedef struct _GRealThreadPool GRealThreadPool;
36
37 struct _GRealThreadPool
38 {
39   GThreadPool pool;
40   GAsyncQueue* queue;
41   GCond* cond;
42   gint max_threads;
43   gint num_threads;
44   gboolean running;
45   gboolean immediate;
46   gboolean waiting;
47   GCompareDataFunc sort_func;
48   gpointer sort_user_data;
49 };
50
51 /* The following is just an address to mark the wakeup order for a
52  * thread, it could be any address (as long, as it isn't a valid
53  * GThreadPool address) */
54 static gconstpointer const wakeup_thread_marker = (gconstpointer) &g_thread_pool_new;
55 static gint wakeup_thread_serial = 0;
56
57 /* Here all unused threads are waiting  */
58 static GAsyncQueue *unused_thread_queue = NULL;
59 static gint unused_threads = 0;
60 static gint max_unused_threads = 0;
61 static gint kill_unused_threads = 0;
62 static guint max_idle_time = 0;
63
64 static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
65                                                            gpointer          data);
66 static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
67 static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
68 static void             g_thread_pool_start_thread        (GRealThreadPool  *pool,
69                                                            GError          **error);
70 static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
71 static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
72 static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
73
74 static void
75 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
76                                    gpointer         data)
77 {
78   if (pool->sort_func) 
79     g_async_queue_push_sorted_unlocked (pool->queue, 
80                                         data,
81                                         pool->sort_func, 
82                                         pool->sort_user_data);
83   else
84     g_async_queue_push_unlocked (pool->queue, data);
85 }
86
87 static GRealThreadPool*
88 g_thread_pool_wait_for_new_pool (void)
89 {
90   GRealThreadPool *pool;
91   gint local_wakeup_thread_serial;
92   guint local_max_unused_threads;
93   gint local_max_idle_time;
94   gint last_wakeup_thread_serial;
95   gboolean have_relayed_thread_marker = FALSE;
96
97   local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
98   local_max_idle_time = g_atomic_int_get (&max_idle_time);
99   last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
100
101   g_atomic_int_inc (&unused_threads);
102
103   do
104     {
105       if (g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
106         {
107           /* If this is a superfluous thread, stop it. */
108           pool = NULL;
109         }
110       else if (local_max_idle_time > 0)
111         {
112           /* If a maximal idle time is given, wait for the given time. */
113           GTimeVal end_time;
114
115           g_get_current_time (&end_time);
116           g_time_val_add (&end_time, local_max_idle_time * 1000);
117
118           DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
119                       g_thread_self (), local_max_idle_time / 1000.0));
120
121           pool = g_async_queue_timed_pop (unused_thread_queue, &end_time);
122         }
123       else
124         {
125           /* If no maximal idle time is given, wait indefinitely. */
126           DEBUG_MSG (("thread %p waiting in global pool.",
127                       g_thread_self ()));
128           pool = g_async_queue_pop (unused_thread_queue);
129         }
130
131       if (pool == wakeup_thread_marker)
132         {
133           local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
134           if (last_wakeup_thread_serial == local_wakeup_thread_serial)
135             {
136               if (!have_relayed_thread_marker)
137               {
138                 /* If this wakeup marker has been received for
139                  * the second time, relay it. 
140                  */
141                 DEBUG_MSG (("thread %p relaying wakeup message to "
142                             "waiting thread with lower serial.",
143                             g_thread_self ()));
144
145                 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
146                 have_relayed_thread_marker = TRUE;
147
148                 /* If a wakeup marker has been relayed, this thread
149                  * will get out of the way for 100 microseconds to
150                  * avoid receiving this marker again. */
151                 g_usleep (100);
152               }
153             }
154           else
155             {
156               if (g_atomic_int_exchange_and_add (&kill_unused_threads, -1) > 0)
157                 {
158                   pool = NULL;
159                   break;
160                 }
161
162               DEBUG_MSG (("thread %p updating to new limits.",
163                           g_thread_self ()));
164
165               local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
166               local_max_idle_time = g_atomic_int_get (&max_idle_time);
167               last_wakeup_thread_serial = local_wakeup_thread_serial;
168
169               have_relayed_thread_marker = FALSE;
170             }
171         }
172     }
173   while (pool == wakeup_thread_marker);
174
175   g_atomic_int_add (&unused_threads, -1);
176
177   return pool;
178 }
179
180 static gpointer
181 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
182 {
183   gpointer task = NULL;
184
185   if (pool->running || (!pool->immediate &&
186                         g_async_queue_length_unlocked (pool->queue) > 0))
187     {
188       /* This thread pool is still active. */
189       if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
190         {
191           /* This is a superfluous thread, so it goes to the global pool. */
192           DEBUG_MSG (("superfluous thread %p in pool %p.",
193                       g_thread_self (), pool));
194         }
195       else if (pool->pool.exclusive)
196         {
197           /* Exclusive threads stay attached to the pool. */
198           task = g_async_queue_pop_unlocked (pool->queue);
199
200           DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
201                       "(%d running, %d unprocessed).",
202                       g_thread_self (), pool, pool->num_threads,
203                       g_async_queue_length_unlocked (pool->queue)));
204         }
205       else
206         {
207           /* A thread will wait for new tasks for at most 1/2
208            * second before going to the global pool.
209            */
210           GTimeVal end_time;
211
212           g_get_current_time (&end_time);
213           g_time_val_add (&end_time, G_USEC_PER_SEC / 2);       /* 1/2 second */
214
215           DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
216                       "(%d running, %d unprocessed).",
217                       g_thread_self (), pool, pool->num_threads,
218                       g_async_queue_length_unlocked (pool->queue)));
219
220           task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
221         }
222     }
223   else
224     {
225       /* This thread pool is inactive, it will no longer process tasks. */
226       DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
227                   "(running: %s, immediate: %s, len: %d).",
228                   pool, g_thread_self (),
229                   pool->running ? "true" : "false",
230                   pool->immediate ? "true" : "false",
231                   g_async_queue_length_unlocked (pool->queue)));
232     }
233
234   return task;
235 }
236
237
238 static gpointer 
239 g_thread_pool_thread_proxy (gpointer data)
240 {
241   GRealThreadPool *pool;
242
243   pool = data;
244
245   DEBUG_MSG (("thread %p started for pool %p.", 
246               g_thread_self (), pool));
247
248   g_async_queue_lock (pool->queue);
249
250   while (TRUE)
251     {
252       gpointer task;
253
254       task = g_thread_pool_wait_for_new_task (pool);
255       if (task)
256         {
257           if (pool->running || !pool->immediate)
258             {
259               /* A task was received and the thread pool is active, so
260                * execute the function. 
261                */
262               g_async_queue_unlock (pool->queue);
263               DEBUG_MSG (("thread %p in pool %p calling func.", 
264                           g_thread_self (), pool));
265               pool->pool.func (task, pool->pool.user_data);
266               g_async_queue_lock (pool->queue);
267             }
268         }
269       else
270         {
271           /* No task was received, so this thread goes to the global
272            * pool. 
273            */
274           gboolean free_pool = FALSE;
275  
276           DEBUG_MSG (("thread %p leaving pool %p for global pool.", 
277                       g_thread_self (), pool));
278           pool->num_threads--;
279
280           if (!pool->running)
281             {
282               if (!pool->waiting)
283                 {
284                   if (pool->num_threads == 0)
285                     {
286                       /* If the pool is not running and no other
287                        * thread is waiting for this thread pool to
288                        * finish and this is the last thread of this
289                        * pool, free the pool.
290                        */
291                       free_pool = TRUE;
292                     }           
293                   else 
294                     {
295                       /* If the pool is not running and no other
296                        * thread is waiting for this thread pool to
297                        * finish and this is not the last thread of
298                        * this pool and there are no tasks left in the
299                        * queue, wakeup the remaining threads. 
300                        */
301                       if (g_async_queue_length_unlocked (pool->queue) == 
302                           - pool->num_threads)
303                         g_thread_pool_wakeup_and_stop_all (pool);
304                     }
305                 }
306               else if (pool->immediate || 
307                        g_async_queue_length_unlocked (pool->queue) <= 0)
308                 {
309                   /* If the pool is not running and another thread is
310                    * waiting for this thread pool to finish and there
311                    * are either no tasks left or the pool shall stop
312                    * immediatly, inform the waiting thread of a change
313                    * of the thread pool state. 
314                    */
315                   g_cond_broadcast (pool->cond);
316                 }
317             }
318
319           g_async_queue_unlock (pool->queue);
320
321           if (free_pool)
322             g_thread_pool_free_internal (pool);
323
324           if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL) 
325             break;
326
327           g_async_queue_lock (pool->queue);
328           
329           DEBUG_MSG (("thread %p entering pool %p from global pool.", 
330                       g_thread_self (), pool));
331
332           /* pool->num_threads++ is not done here, but in
333            * g_thread_pool_start_thread to make the new started thread
334            * known to the pool, before itself can do it. 
335            */
336         }
337     }
338
339   return NULL;
340 }
341
342 static void
343 g_thread_pool_start_thread (GRealThreadPool  *pool, 
344                             GError          **error)
345 {
346   gboolean success = FALSE;
347   
348   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
349     /* Enough threads are already running */
350     return;
351
352   g_async_queue_lock (unused_thread_queue);
353
354   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
355     {
356       g_async_queue_push_unlocked (unused_thread_queue, pool);
357       success = TRUE;
358     }
359
360   g_async_queue_unlock (unused_thread_queue);
361
362   if (!success)
363     {
364       GError *local_error = NULL;
365       /* No thread was found, we have to start a new one */
366       g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
367       
368       if (local_error)
369         {
370           g_propagate_error (error, local_error);
371           return;
372         }
373     }
374
375   /* See comment in g_thread_pool_thread_proxy as to why this is done
376    * here and not there
377    */
378   pool->num_threads++;
379 }
380
381 /**
382  * g_thread_pool_new: 
383  * @func: a function to execute in the threads of the new thread pool
384  * @user_data: user data that is handed over to @func every time it 
385  *   is called
386  * @max_threads: the maximal number of threads to execute concurrently in 
387  *   the new thread pool, -1 means no limit
388  * @exclusive: should this thread pool be exclusive?
389  * @error: return location for error
390  *
391  * This function creates a new thread pool.
392  *
393  * Whenever you call g_thread_pool_push(), either a new thread is
394  * created or an unused one is reused. At most @max_threads threads
395  * are running concurrently for this thread pool. @max_threads = -1
396  * allows unlimited threads to be created for this thread pool. The
397  * newly created or reused thread now executes the function @func with
398  * the two arguments. The first one is the parameter to
399  * g_thread_pool_push() and the second one is @user_data.
400  *
401  * The parameter @exclusive determines, whether the thread pool owns
402  * all threads exclusive or whether the threads are shared
403  * globally. If @exclusive is %TRUE, @max_threads threads are started
404  * immediately and they will run exclusively for this thread pool until
405  * it is destroyed by g_thread_pool_free(). If @exclusive is %FALSE,
406  * threads are created, when needed and shared between all
407  * non-exclusive thread pools. This implies that @max_threads may not
408  * be -1 for exclusive thread pools.
409  *
410  * @error can be %NULL to ignore errors, or non-%NULL to report
411  * errors. An error can only occur when @exclusive is set to %TRUE and
412  * not all @max_threads threads could be created.
413  *
414  * Return value: the new #GThreadPool
415  **/
416 GThreadPool* 
417 g_thread_pool_new (GFunc            func,
418                    gpointer         user_data,
419                    gint             max_threads,
420                    gboolean         exclusive,
421                    GError         **error)
422 {
423   GRealThreadPool *retval;
424   G_LOCK_DEFINE_STATIC (init);
425
426   g_return_val_if_fail (func, NULL);
427   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
428   g_return_val_if_fail (max_threads >= -1, NULL);
429   g_return_val_if_fail (g_thread_supported (), NULL);
430
431   retval = g_new (GRealThreadPool, 1);
432
433   retval->pool.func = func;
434   retval->pool.user_data = user_data;
435   retval->pool.exclusive = exclusive;
436   retval->queue = g_async_queue_new ();
437   retval->cond = NULL;
438   retval->max_threads = max_threads;
439   retval->num_threads = 0;
440   retval->running = TRUE;
441   retval->sort_func = NULL;
442   retval->sort_user_data = NULL;
443
444   G_LOCK (init);
445   if (!unused_thread_queue)
446       unused_thread_queue = g_async_queue_new ();
447   G_UNLOCK (init);
448
449   if (retval->pool.exclusive)
450     {
451       g_async_queue_lock (retval->queue);
452   
453       while (retval->num_threads < retval->max_threads)
454         {
455           GError *local_error = NULL;
456           g_thread_pool_start_thread (retval, &local_error);
457           if (local_error)
458             {
459               g_propagate_error (error, local_error);
460               break;
461             }
462         }
463
464       g_async_queue_unlock (retval->queue);
465     }
466
467   return (GThreadPool*) retval;
468 }
469
470 /**
471  * g_thread_pool_push:
472  * @pool: a #GThreadPool
473  * @data: a new task for @pool
474  * @error: return location for error
475  * 
476  * Inserts @data into the list of tasks to be executed by @pool. When
477  * the number of currently running threads is lower than the maximal
478  * allowed number of threads, a new thread is started (or reused) with
479  * the properties given to g_thread_pool_new (). Otherwise @data stays
480  * in the queue until a thread in this pool finishes its previous task
481  * and processes @data. 
482  *
483  * @error can be %NULL to ignore errors, or non-%NULL to report
484  * errors. An error can only occur when a new thread couldn't be
485  * created. In that case @data is simply appended to the queue of work
486  * to do.  
487  **/
488 void 
489 g_thread_pool_push (GThreadPool  *pool,
490                     gpointer      data,
491                     GError      **error)
492 {
493   GRealThreadPool *real;
494
495   real = (GRealThreadPool*) pool;
496
497   g_return_if_fail (real);
498   g_return_if_fail (real->running);
499
500   g_async_queue_lock (real->queue);
501
502   if (g_async_queue_length_unlocked (real->queue) >= 0)
503     /* No thread is waiting in the queue */
504     g_thread_pool_start_thread (real, error);
505
506   g_thread_pool_queue_push_unlocked (real, data);
507   g_async_queue_unlock (real->queue);
508 }
509
510 /**
511  * g_thread_pool_set_max_threads:
512  * @pool: a #GThreadPool
513  * @max_threads: a new maximal number of threads for @pool
514  * @error: return location for error
515  * 
516  * Sets the maximal allowed number of threads for @pool. A value of -1
517  * means, that the maximal number of threads is unlimited.
518  *
519  * Setting @max_threads to 0 means stopping all work for @pool. It is
520  * effectively frozen until @max_threads is set to a non-zero value
521  * again.
522  * 
523  * A thread is never terminated while calling @func, as supplied by
524  * g_thread_pool_new (). Instead the maximal number of threads only
525  * has effect for the allocation of new threads in g_thread_pool_push(). 
526  * A new thread is allocated, whenever the number of currently
527  * running threads in @pool is smaller than the maximal number.
528  *
529  * @error can be %NULL to ignore errors, or non-%NULL to report
530  * errors. An error can only occur when a new thread couldn't be
531  * created. 
532  **/
533 void
534 g_thread_pool_set_max_threads (GThreadPool  *pool,
535                                gint          max_threads,
536                                GError      **error)
537 {
538   GRealThreadPool *real;
539   gint to_start;
540
541   real = (GRealThreadPool*) pool;
542
543   g_return_if_fail (real);
544   g_return_if_fail (real->running);
545   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
546   g_return_if_fail (max_threads >= -1);
547
548   g_async_queue_lock (real->queue);
549
550   real->max_threads = max_threads;
551   
552   if (pool->exclusive)
553     to_start = real->max_threads - real->num_threads;
554   else
555     to_start = g_async_queue_length_unlocked (real->queue);
556   
557   for ( ; to_start > 0; to_start--)
558     {
559       GError *local_error = NULL;
560
561       g_thread_pool_start_thread (real, &local_error);
562       if (local_error)
563         {
564           g_propagate_error (error, local_error);
565           break;
566         }
567     }
568    
569   g_async_queue_unlock (real->queue);
570 }
571
572 /**
573  * g_thread_pool_get_max_threads:
574  * @pool: a #GThreadPool
575  *
576  * Returns the maximal number of threads for @pool.
577  *
578  * Return value: the maximal number of threads
579  **/
580 gint
581 g_thread_pool_get_max_threads (GThreadPool *pool)
582 {
583   GRealThreadPool *real;
584   gint retval;
585
586   real = (GRealThreadPool*) pool;
587
588   g_return_val_if_fail (real, 0);
589   g_return_val_if_fail (real->running, 0);
590
591   g_async_queue_lock (real->queue);
592   retval = real->max_threads;
593   g_async_queue_unlock (real->queue);
594
595   return retval;
596 }
597
598 /**
599  * g_thread_pool_get_num_threads:
600  * @pool: a #GThreadPool
601  *
602  * Returns the number of threads currently running in @pool.
603  *
604  * Return value: the number of threads currently running
605  **/
606 guint
607 g_thread_pool_get_num_threads (GThreadPool *pool)
608 {
609   GRealThreadPool *real;
610   guint retval;
611
612   real = (GRealThreadPool*) pool;
613
614   g_return_val_if_fail (real, 0);
615   g_return_val_if_fail (real->running, 0);
616
617   g_async_queue_lock (real->queue);
618   retval = real->num_threads;
619   g_async_queue_unlock (real->queue);
620
621   return retval;
622 }
623
624 /**
625  * g_thread_pool_unprocessed:
626  * @pool: a #GThreadPool
627  *
628  * Returns the number of tasks still unprocessed in @pool.
629  *
630  * Return value: the number of unprocessed tasks
631  **/
632 guint
633 g_thread_pool_unprocessed (GThreadPool *pool)
634 {
635   GRealThreadPool *real;
636   gint unprocessed;
637
638   real = (GRealThreadPool*) pool;
639
640   g_return_val_if_fail (real, 0);
641   g_return_val_if_fail (real->running, 0);
642
643   unprocessed = g_async_queue_length (real->queue);
644
645   return MAX (unprocessed, 0);
646 }
647
648 /**
649  * g_thread_pool_free:
650  * @pool: a #GThreadPool
651  * @immediate: should @pool shut down immediately?
652  * @wait_: should the function wait for all tasks to be finished?
653  *
654  * Frees all resources allocated for @pool.
655  *
656  * If @immediate is %TRUE, no new task is processed for
657  * @pool. Otherwise @pool is not freed before the last task is
658  * processed. Note however, that no thread of this pool is
659  * interrupted, while processing a task. Instead at least all still
660  * running threads can finish their tasks before the @pool is freed.
661  *
662  * If @wait_ is %TRUE, the functions does not return before all tasks
663  * to be processed (dependent on @immediate, whether all or only the
664  * currently running) are ready. Otherwise the function returns immediately.
665  *
666  * After calling this function @pool must not be used anymore. 
667  **/
668 void
669 g_thread_pool_free (GThreadPool *pool,
670                     gboolean     immediate,
671                     gboolean     wait_)
672 {
673   GRealThreadPool *real;
674
675   real = (GRealThreadPool*) pool;
676
677   g_return_if_fail (real);
678   g_return_if_fail (real->running);
679
680   /* If there's no thread allowed here, there is not much sense in
681    * not stopping this pool immediately, when it's not empty 
682    */
683   g_return_if_fail (immediate || 
684                     real->max_threads != 0 || 
685                     g_async_queue_length (real->queue) == 0);
686
687   g_async_queue_lock (real->queue);
688
689   real->running = FALSE;
690   real->immediate = immediate;
691   real->waiting = wait_;
692
693   if (wait_)
694     {
695       real->cond = g_cond_new ();
696
697       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads &&
698              !(immediate && real->num_threads == 0))
699         g_cond_wait (real->cond, _g_async_queue_get_mutex (real->queue));
700     }
701
702   if (immediate || g_async_queue_length_unlocked (real->queue) == -real->num_threads)
703     {
704       /* No thread is currently doing something (and nothing is left
705        * to process in the queue) 
706        */
707       if (real->num_threads == 0) 
708         {
709           /* No threads left, we clean up */
710           g_async_queue_unlock (real->queue);
711           g_thread_pool_free_internal (real);
712           return;
713         }
714
715       g_thread_pool_wakeup_and_stop_all (real);
716     }
717   
718   /* The last thread should cleanup the pool */
719   real->waiting = FALSE; 
720   g_async_queue_unlock (real->queue);
721 }
722
723 static void
724 g_thread_pool_free_internal (GRealThreadPool* pool)
725 {
726   g_return_if_fail (pool);
727   g_return_if_fail (pool->running == FALSE);
728   g_return_if_fail (pool->num_threads == 0);
729
730   g_async_queue_unref (pool->queue);
731
732   if (pool->cond)
733     g_cond_free (pool->cond);
734
735   g_free (pool);
736 }
737
738 static void
739 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
740 {
741   guint i;
742   
743   g_return_if_fail (pool);
744   g_return_if_fail (pool->running == FALSE);
745   g_return_if_fail (pool->num_threads != 0);
746
747   pool->immediate = TRUE; 
748
749   for (i = 0; i < pool->num_threads; i++)
750     g_thread_pool_queue_push_unlocked (pool, GUINT_TO_POINTER (1));
751 }
752
753 /**
754  * g_thread_pool_set_max_unused_threads:
755  * @max_threads: maximal number of unused threads
756  *
757  * Sets the maximal number of unused threads to @max_threads. If
758  * @max_threads is -1, no limit is imposed on the number of unused
759  * threads.
760  **/
761 void
762 g_thread_pool_set_max_unused_threads (gint max_threads)
763 {
764   g_return_if_fail (max_threads >= -1);  
765
766   g_atomic_int_set (&max_unused_threads, max_threads);
767
768   if (max_threads != -1)
769     {
770       max_threads -= g_atomic_int_get (&unused_threads);
771       if (max_threads < 0)
772         {
773           g_atomic_int_set (&kill_unused_threads, -max_threads);
774           g_atomic_int_inc (&wakeup_thread_serial);
775
776           g_async_queue_lock (unused_thread_queue);
777
778           do
779             {
780               g_async_queue_push_unlocked (unused_thread_queue,
781                                            wakeup_thread_marker);
782             }
783           while (++max_threads);
784
785           g_async_queue_unlock (unused_thread_queue);
786         }
787     }
788 }
789
790 /**
791  * g_thread_pool_get_max_unused_threads:
792  * 
793  * Returns the maximal allowed number of unused threads.
794  *
795  * Return value: the maximal number of unused threads
796  **/
797 gint
798 g_thread_pool_get_max_unused_threads (void)
799 {
800   return g_atomic_int_get (&max_unused_threads);
801 }
802
803 /**
804  * g_thread_pool_get_num_unused_threads:
805  * 
806  * Returns the number of currently unused threads.
807  *
808  * Return value: the number of currently unused threads
809  **/
810 guint 
811 g_thread_pool_get_num_unused_threads (void)
812 {
813   return g_atomic_int_get (&unused_threads);
814 }
815
816 /**
817  * g_thread_pool_stop_unused_threads:
818  * 
819  * Stops all currently unused threads. This does not change the
820  * maximal number of unused threads. This function can be used to
821  * regularly stop all unused threads e.g. from g_timeout_add().
822  **/
823 void
824 g_thread_pool_stop_unused_threads (void)
825
826   guint oldval;
827
828   oldval = g_thread_pool_get_max_unused_threads ();
829
830   g_thread_pool_set_max_unused_threads (0);
831   g_thread_pool_set_max_unused_threads (oldval);
832 }
833
834 /**
835  * g_thread_pool_set_sort_function:
836  * @pool: a #GThreadPool
837  * @func: the #GCompareDataFunc used to sort the list of tasks. 
838  *     This function is passed two tasks. It should return
839  *     0 if the order in which they are handled does not matter, 
840  *     a negative value if the first task should be processed before
841  *     the second or a positive value if the second task should be 
842  *     processed first.
843  * @user_data: user data passed to @func.
844  *
845  * Sets the function used to sort the list of tasks. This allows the
846  * tasks to be processed by a priority determined by @func, and not
847  * just in the order in which they were added to the pool.
848  *
849  * Note, if the maximum number of threads is more than 1, the order
850  * that threads are executed can not be guranteed 100%. Threads are
851  * scheduled by the operating system and are executed at random. It
852  * cannot be assumed that threads are executed in the order they are
853  * created. 
854  *
855  * Since: 2.10
856  **/
857 void 
858 g_thread_pool_set_sort_function (GThreadPool      *pool,
859                                  GCompareDataFunc  func,
860                                  gpointer          user_data)
861
862   GRealThreadPool *real;
863
864   real = (GRealThreadPool*) pool;
865
866   g_return_if_fail (real);
867   g_return_if_fail (real->running);
868
869   g_async_queue_lock (real->queue);
870
871   real->sort_func = func;
872   real->sort_user_data = user_data;
873   
874   if (func) 
875     g_async_queue_sort_unlocked (real->queue, 
876                                  real->sort_func,
877                                  real->sort_user_data);
878
879   g_async_queue_unlock (real->queue);
880 }
881
882 /**
883  * g_thread_pool_set_max_idle_time:
884  * @interval: the maximum @interval (1/1000ths of a second) a thread
885  *     can be idle. 
886  *
887  * This function will set the maximum @interval that a thread waiting
888  * in the pool for new tasks can be idle for before being
889  * stopped. This function is similar to calling
890  * g_thread_pool_stop_unused_threads() on a regular timeout, except,
891  * this is done on a per thread basis.    
892  *
893  * By setting @interval to 0, idle threads will not be stopped.
894  *  
895  * This function makes use of g_async_queue_timed_pop () using
896  * @interval.
897  *
898  * Since: 2.10
899  **/
900 void
901 g_thread_pool_set_max_idle_time (guint interval)
902
903   guint i;
904
905   g_atomic_int_set (&max_idle_time, interval);
906
907   i = g_atomic_int_get (&unused_threads);
908   if (i > 0)
909     {
910       g_atomic_int_inc (&wakeup_thread_serial);
911       g_async_queue_lock (unused_thread_queue);
912
913       do
914         {
915           g_async_queue_push_unlocked (unused_thread_queue,
916                                        wakeup_thread_marker);
917         }
918       while (--i);
919
920       g_async_queue_unlock (unused_thread_queue);
921     }
922 }
923
924 /**
925  * g_thread_pool_get_max_idle_time:
926  * 
927  * This function will return the maximum @interval that a thread will
928  * wait in the thread pool for new tasks before being stopped.
929  *
930  * If this function returns 0, threads waiting in the thread pool for
931  * new work are not stopped.
932  *
933  * Return value: the maximum @interval to wait for new tasks in the
934  *     thread pool before stopping the thread (1/1000ths of a second).
935  *  
936  * Since: 2.10
937  **/
938 guint
939 g_thread_pool_get_max_idle_time (void)
940
941   return g_atomic_int_get (&max_idle_time);
942 }
943
944 #define __G_THREADPOOL_C__
945 #include "galiasdef.c"