5aa05f7a8501284f79c6c96994a1a4b485a6a31c
[platform/upstream/glib.git] / glib / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "glib.h"
30 #include "galias.h"
31
32
33 typedef struct _GRealThreadPool GRealThreadPool;
34
35 struct _GRealThreadPool
36 {
37   GThreadPool pool;
38   GAsyncQueue* queue;
39   gint max_threads;
40   gint num_threads;
41   gboolean running;
42   gboolean immediate;
43   gboolean waiting;
44   GCompareDataFunc sort_func;
45   gpointer sort_user_data;
46 };
47
48 /* The following is just an address to mark the stop order for a
49  * thread, it could be any address (as long, as it isn't a valid
50  * GThreadPool address) */
51 static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new;
52
53 /* Here all unused threads are waiting  */
54 static GAsyncQueue *unused_thread_queue;
55 static gint unused_threads = 0;
56 static gint max_unused_threads = 0;
57 G_LOCK_DEFINE_STATIC (unused_threads);
58
59 static GMutex *inform_mutex = NULL;
60 static GCond *inform_cond = NULL;
61
62
63 static void     g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
64                                                    gpointer          data);
65 static void     g_thread_pool_free_internal       (GRealThreadPool  *pool);
66 static gpointer g_thread_pool_thread_proxy        (gpointer          data);
67 static void     g_thread_pool_start_thread        (GRealThreadPool  *pool,
68                                                    GError          **error);
69 static void     g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
70
71
72 #define g_thread_should_run(pool, len) \
73   ((pool)->running || (!(pool)->immediate && (len) > 0))
74
75
76 static void
77 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
78                                    gpointer         data)
79 {
80   if (pool->sort_func) 
81     g_async_queue_push_sorted_unlocked (pool->queue, 
82                                         data,
83                                         pool->sort_func, 
84                                         pool->sort_user_data);
85   else
86     g_async_queue_push_unlocked (pool->queue, data);
87 }
88
89 static gpointer 
90 g_thread_pool_thread_proxy (gpointer data)
91 {
92   GRealThreadPool *pool = data;
93   gboolean watcher = FALSE;
94
95   g_async_queue_lock (pool->queue);
96   while (TRUE)
97     {
98       gpointer task; 
99       gboolean goto_global_pool = !pool->pool.exclusive;
100       gint len = g_async_queue_length_unlocked (pool->queue);
101       
102       if (g_thread_should_run (pool, len))
103         {
104           if (watcher)
105             {
106               /* This thread is actually not needed here, but it waits
107                * for some time anyway. If during that time a new
108                * request arrives, this saves process
109                * swicthes. Otherwise the thread will go to the global
110                * pool afterwards */
111               GTimeVal end_time;
112               g_get_current_time (&end_time);
113               g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */
114               task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
115             }
116           else
117             task = g_async_queue_pop_unlocked (pool->queue);
118           
119           if (task)
120             {
121               watcher = FALSE;
122               if (pool->num_threads > pool->max_threads && 
123                   pool->max_threads != -1)
124                 /* We are in fact a superfluous threads, so we go to
125                  * the global pool and just hand the data further to
126                  * the next one waiting in the queue */
127                 {
128                   g_thread_pool_queue_push_unlocked (pool, task);
129                   goto_global_pool = TRUE;
130                 }
131               else if (pool->running || !pool->immediate)
132                 {
133                   g_async_queue_unlock (pool->queue);
134                   pool->pool.func (task, pool->pool.user_data);
135                   g_async_queue_lock (pool->queue);
136                 }
137             }
138           len = g_async_queue_length_unlocked (pool->queue);
139         }
140
141       if (!g_thread_should_run (pool, len))
142         {
143           g_cond_broadcast (inform_cond);
144           goto_global_pool = TRUE;
145         }
146       else if (len > 0)
147         {
148           /* At this pool there are no threads waiting, but tasks are. */
149           goto_global_pool = FALSE; 
150         }
151       else if (len == 0 && !watcher && !pool->pool.exclusive)
152         {
153           /* Here neither threads nor tasks are queued and we didn't
154            * just return from a timed wait. We now wait for a limited
155            * time at this pool for new tasks to avoid costly context
156            * switches. */
157           goto_global_pool = FALSE;
158           watcher = TRUE;
159         }
160
161       if (goto_global_pool)
162         {
163           pool->num_threads--;
164
165           if (!pool->running && !pool->waiting)
166             {
167               if (pool->num_threads == 0)
168                 {
169                   g_async_queue_unlock (pool->queue);
170                   g_thread_pool_free_internal (pool);
171                 }               
172               else 
173                 {
174                   if (len == - pool->num_threads)
175                     g_thread_pool_wakeup_and_stop_all (pool);
176
177                   g_async_queue_unlock (pool->queue);
178                 }
179             }
180           else
181             g_async_queue_unlock (pool->queue);
182           
183           g_async_queue_lock (unused_thread_queue);
184
185           G_LOCK (unused_threads);
186           if ((unused_threads >= max_unused_threads && 
187                max_unused_threads != -1))
188             {
189               G_UNLOCK (unused_threads);
190               g_async_queue_unlock (unused_thread_queue);
191               /* Stop this thread */
192               return NULL;      
193             }
194           unused_threads++;
195           G_UNLOCK (unused_threads);
196
197           pool = g_async_queue_pop_unlocked (unused_thread_queue);
198
199           G_LOCK (unused_threads);
200           unused_threads--;
201           G_UNLOCK (unused_threads);
202
203           g_async_queue_unlock (unused_thread_queue);
204           
205           if (pool == stop_this_thread_marker)
206             /* Stop this thread */
207             return NULL;
208           
209           g_async_queue_lock (pool->queue);
210
211           /* pool->num_threads++ is not done here, but in
212            * g_thread_pool_start_thread to make the new started thread
213            * known to the pool, before itself can do it. */
214         }
215     }
216   return NULL;
217 }
218
219 static void
220 g_thread_pool_start_thread (GRealThreadPool  *pool, 
221                             GError          **error)
222 {
223   gboolean success = FALSE;
224   
225   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
226     /* Enough threads are already running */
227     return;
228
229   g_async_queue_lock (unused_thread_queue);
230
231   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
232     {
233       g_async_queue_push_unlocked (unused_thread_queue, pool);
234       success = TRUE;
235     }
236
237   g_async_queue_unlock (unused_thread_queue);
238
239   if (!success)
240     {
241       GError *local_error = NULL;
242       /* No thread was found, we have to start a new one */
243       g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
244       
245       if (local_error)
246         {
247           g_propagate_error (error, local_error);
248           return;
249         }
250     }
251
252   /* See comment in g_thread_pool_thread_proxy as to why this is done
253    * here and not there */
254   pool->num_threads++;
255 }
256
257 /**
258  * g_thread_pool_new: 
259  * @func: a function to execute in the threads of the new thread pool
260  * @user_data: user data that is handed over to @func every time it 
261  *   is called
262  * @max_threads: the maximal number of threads to execute concurrently in 
263  *   the new thread pool, -1 means no limit
264  * @exclusive: should this thread pool be exclusive?
265  * @error: return location for error
266  *
267  * This function creates a new thread pool.
268  *
269  * Whenever you call g_thread_pool_push(), either a new thread is
270  * created or an unused one is reused. At most @max_threads threads
271  * are running concurrently for this thread pool. @max_threads = -1
272  * allows unlimited threads to be created for this thread pool. The
273  * newly created or reused thread now executes the function @func with
274  * the two arguments. The first one is the parameter to
275  * g_thread_pool_push() and the second one is @user_data.
276  *
277  * The parameter @exclusive determines, whether the thread pool owns
278  * all threads exclusive or whether the threads are shared
279  * globally. If @exclusive is %TRUE, @max_threads threads are started
280  * immediately and they will run exclusively for this thread pool until
281  * it is destroyed by g_thread_pool_free(). If @exclusive is %FALSE,
282  * threads are created, when needed and shared between all
283  * non-exclusive thread pools. This implies that @max_threads may not
284  * be -1 for exclusive thread pools.
285  *
286  * @error can be %NULL to ignore errors, or non-%NULL to report
287  * errors. An error can only occur when @exclusive is set to %TRUE and
288  * not all @max_threads threads could be created.
289  *
290  * Return value: the new #GThreadPool
291  **/
292 GThreadPool* 
293 g_thread_pool_new (GFunc            func,
294                    gpointer         user_data,
295                    gint             max_threads,
296                    gboolean         exclusive,
297                    GError         **error)
298 {
299   GRealThreadPool *retval;
300   G_LOCK_DEFINE_STATIC (init);
301
302   g_return_val_if_fail (func, NULL);
303   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
304   g_return_val_if_fail (max_threads >= -1, NULL);
305   g_return_val_if_fail (g_thread_supported (), NULL);
306
307   retval = g_new (GRealThreadPool, 1);
308
309   retval->pool.func = func;
310   retval->pool.user_data = user_data;
311   retval->pool.exclusive = exclusive;
312   retval->queue = g_async_queue_new ();
313   retval->max_threads = max_threads;
314   retval->num_threads = 0;
315   retval->running = TRUE;
316   retval->sort_func = NULL;
317   retval->sort_user_data = NULL;
318
319   G_LOCK (init);
320   
321   if (!inform_mutex)
322     {
323       inform_mutex = g_mutex_new ();
324       inform_cond = g_cond_new ();
325       unused_thread_queue = g_async_queue_new ();
326     }
327
328   G_UNLOCK (init);
329
330   if (retval->pool.exclusive)
331     {
332       g_async_queue_lock (retval->queue);
333   
334       while (retval->num_threads < retval->max_threads)
335         {
336           GError *local_error = NULL;
337           g_thread_pool_start_thread (retval, &local_error);
338           if (local_error)
339             {
340               g_propagate_error (error, local_error);
341               break;
342             }
343         }
344
345       g_async_queue_unlock (retval->queue);
346     }
347
348   return (GThreadPool*) retval;
349 }
350
351 /**
352  * g_thread_pool_push:
353  * @pool: a #GThreadPool
354  * @data: a new task for @pool
355  * @error: return location for error
356  * 
357  * Inserts @data into the list of tasks to be executed by @pool. When
358  * the number of currently running threads is lower than the maximal
359  * allowed number of threads, a new thread is started (or reused) with
360  * the properties given to g_thread_pool_new (). Otherwise @data stays
361  * in the queue until a thread in this pool finishes its previous task
362  * and processes @data. 
363  *
364  * @error can be %NULL to ignore errors, or non-%NULL to report
365  * errors. An error can only occur when a new thread couldn't be
366  * created. In that case @data is simply appended to the queue of work
367  * to do.  
368  **/
369 void 
370 g_thread_pool_push (GThreadPool     *pool,
371                     gpointer         data,
372                     GError         **error)
373 {
374   GRealThreadPool *real = (GRealThreadPool*) pool;
375
376   g_return_if_fail (real);
377
378   g_async_queue_lock (real->queue);
379   
380   if (!real->running)
381     {
382       g_async_queue_unlock (real->queue);
383       g_return_if_fail (real->running);
384     }
385
386   if (g_async_queue_length_unlocked (real->queue) >= 0)
387     /* No thread is waiting in the queue */
388     g_thread_pool_start_thread (real, error);
389
390   g_thread_pool_queue_push_unlocked (real, data);
391   g_async_queue_unlock (real->queue);
392 }
393
394 /**
395  * g_thread_pool_set_max_threads:
396  * @pool: a #GThreadPool
397  * @max_threads: a new maximal number of threads for @pool
398  * @error: return location for error
399  * 
400  * Sets the maximal allowed number of threads for @pool. A value of -1
401  * means, that the maximal number of threads is unlimited.
402  *
403  * Setting @max_threads to 0 means stopping all work for @pool. It is
404  * effectively frozen until @max_threads is set to a non-zero value
405  * again.
406  * 
407  * A thread is never terminated while calling @func, as supplied by
408  * g_thread_pool_new (). Instead the maximal number of threads only
409  * has effect for the allocation of new threads in g_thread_pool_push(). 
410  * A new thread is allocated, whenever the number of currently
411  * running threads in @pool is smaller than the maximal number.
412  *
413  * @error can be %NULL to ignore errors, or non-%NULL to report
414  * errors. An error can only occur when a new thread couldn't be
415  * created. 
416  **/
417 void
418 g_thread_pool_set_max_threads (GThreadPool     *pool,
419                                gint             max_threads,
420                                GError         **error)
421 {
422   GRealThreadPool *real = (GRealThreadPool*) pool;
423   gint to_start;
424
425   g_return_if_fail (real);
426   g_return_if_fail (real->running);
427   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
428   g_return_if_fail (max_threads >= -1);
429
430   g_async_queue_lock (real->queue);
431
432   real->max_threads = max_threads;
433   
434   if (pool->exclusive)
435     to_start = real->max_threads - real->num_threads;
436   else
437     to_start = g_async_queue_length_unlocked (real->queue);
438   
439   for ( ; to_start > 0; to_start--)
440     {
441       GError *local_error = NULL;
442       g_thread_pool_start_thread (real, &local_error);
443       if (local_error)
444         {
445           g_propagate_error (error, local_error);
446           break;
447         }
448     }
449    
450   g_async_queue_unlock (real->queue);
451 }
452
453 /**
454  * g_thread_pool_get_max_threads:
455  * @pool: a #GThreadPool
456  *
457  * Returns the maximal number of threads for @pool.
458  *
459  * Return value: the maximal number of threads
460  **/
461 gint
462 g_thread_pool_get_max_threads (GThreadPool     *pool)
463 {
464   GRealThreadPool *real = (GRealThreadPool*) pool;
465   gint retval;
466
467   g_return_val_if_fail (real, 0);
468   g_return_val_if_fail (real->running, 0);
469
470   g_async_queue_lock (real->queue);
471
472   retval = real->max_threads;
473     
474   g_async_queue_unlock (real->queue);
475
476   return retval;
477 }
478
479 /**
480  * g_thread_pool_get_num_threads:
481  * @pool: a #GThreadPool
482  *
483  * Returns the number of threads currently running in @pool.
484  *
485  * Return value: the number of threads currently running
486  **/
487 guint
488 g_thread_pool_get_num_threads (GThreadPool     *pool)
489 {
490   GRealThreadPool *real = (GRealThreadPool*) pool;
491   guint retval;
492
493   g_return_val_if_fail (real, 0);
494   g_return_val_if_fail (real->running, 0);
495
496   g_async_queue_lock (real->queue);
497
498   retval = real->num_threads;
499     
500   g_async_queue_unlock (real->queue);
501
502   return retval;
503 }
504
505 /**
506  * g_thread_pool_unprocessed:
507  * @pool: a #GThreadPool
508  *
509  * Returns the number of tasks still unprocessed in @pool.
510  *
511  * Return value: the number of unprocessed tasks
512  **/
513 guint
514 g_thread_pool_unprocessed (GThreadPool     *pool)
515 {
516   GRealThreadPool *real = (GRealThreadPool*) pool;
517   gint unprocessed;
518
519   g_return_val_if_fail (real, 0);
520   g_return_val_if_fail (real->running, 0);
521
522   unprocessed = g_async_queue_length (real->queue);
523
524   return MAX (unprocessed, 0);
525 }
526
527 /**
528  * g_thread_pool_free:
529  * @pool: a #GThreadPool
530  * @immediate: should @pool shut down immediately?
531  * @wait: should the function wait for all tasks to be finished?
532  *
533  * Frees all resources allocated for @pool.
534  *
535  * If @immediate is %TRUE, no new task is processed for
536  * @pool. Otherwise @pool is not freed before the last task is
537  * processed. Note however, that no thread of this pool is
538  * interrupted, while processing a task. Instead at least all still
539  * running threads can finish their tasks before the @pool is freed.
540  *
541  * If @wait is %TRUE, the functions does not return before all tasks
542  * to be processed (dependent on @immediate, whether all or only the
543  * currently running) are ready. Otherwise the function returns immediately.
544  *
545  * After calling this function @pool must not be used anymore. 
546  **/
547 void
548 g_thread_pool_free (GThreadPool     *pool,
549                     gboolean         immediate,
550                     gboolean         wait)
551 {
552   GRealThreadPool *real = (GRealThreadPool*) pool;
553
554   g_return_if_fail (real);
555   g_return_if_fail (real->running);
556   /* It there's no thread allowed here, there is not much sense in
557    * not stopping this pool immediately, when it's not empty */
558   g_return_if_fail (immediate || real->max_threads != 0 || 
559                     g_async_queue_length (real->queue) == 0);
560
561   g_async_queue_lock (real->queue);
562
563   real->running = FALSE;
564   real->immediate = immediate;
565   real->waiting = wait;
566
567   if (wait)
568     {
569       g_mutex_lock (inform_mutex);
570       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads &&
571              !(immediate && real->num_threads == 0))
572         {
573           g_async_queue_unlock (real->queue); 
574           g_cond_wait (inform_cond, inform_mutex); 
575           g_async_queue_lock (real->queue); 
576         }
577       g_mutex_unlock (inform_mutex); 
578     }
579
580   if (immediate ||
581       g_async_queue_length_unlocked (real->queue) == -real->num_threads)
582     {
583       /* No thread is currently doing something (and nothing is left
584        * to process in the queue) */
585       if (real->num_threads == 0) /* No threads left, we clean up */
586         {
587           g_async_queue_unlock (real->queue);
588           g_thread_pool_free_internal (real);
589           return;
590         }
591
592       g_thread_pool_wakeup_and_stop_all (real);
593     }
594   
595   real->waiting = FALSE; /* The last thread should cleanup the pool */
596   g_async_queue_unlock (real->queue);
597 }
598
599 static void
600 g_thread_pool_free_internal (GRealThreadPool* pool)
601 {
602   g_return_if_fail (pool);
603   g_return_if_fail (!pool->running);
604   g_return_if_fail (pool->num_threads == 0);
605
606   g_async_queue_unref (pool->queue);
607
608   g_free (pool);
609 }
610
611 static void
612 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
613 {
614   guint i;
615   
616   g_return_if_fail (pool);
617   g_return_if_fail (!pool->running);
618   g_return_if_fail (pool->num_threads != 0);
619   g_return_if_fail (g_async_queue_length_unlocked (pool->queue) == 
620                     -pool->num_threads);
621
622   pool->immediate = TRUE; 
623   for (i = 0; i < pool->num_threads; i++)
624     g_thread_pool_queue_push_unlocked (pool, GUINT_TO_POINTER (1));
625 }
626
627 /**
628  * g_thread_pool_set_max_unused_threads:
629  * @max_threads: maximal number of unused threads
630  *
631  * Sets the maximal number of unused threads to @max_threads. If
632  * @max_threads is -1, no limit is imposed on the number of unused
633  * threads.
634  **/
635 void
636 g_thread_pool_set_max_unused_threads (gint max_threads)
637 {
638   g_return_if_fail (max_threads >= -1);  
639
640   G_LOCK (unused_threads);
641   
642   max_unused_threads = max_threads;
643
644   if (max_unused_threads < unused_threads && max_unused_threads != -1)
645     {
646       guint i;
647
648       g_async_queue_lock (unused_thread_queue);
649       for (i = unused_threads - max_unused_threads; i > 0; i--)
650         g_async_queue_push_unlocked (unused_thread_queue, 
651                                      stop_this_thread_marker);
652       g_async_queue_unlock (unused_thread_queue);
653     }
654     
655   G_UNLOCK (unused_threads);
656 }
657
658 /**
659  * g_thread_pool_get_max_unused_threads:
660  * 
661  * Returns the maximal allowed number of unused threads.
662  *
663  * Return value: the maximal number of unused threads
664  **/
665 gint
666 g_thread_pool_get_max_unused_threads (void)
667 {
668   gint retval;
669   
670   G_LOCK (unused_threads);
671   retval = max_unused_threads;
672   G_UNLOCK (unused_threads);
673
674   return retval;
675 }
676
677 /**
678  * g_thread_pool_get_num_unused_threads:
679  * 
680  * Returns the number of currently unused threads.
681  *
682  * Return value: the number of currently unused threads
683  **/
684 guint g_thread_pool_get_num_unused_threads (void)
685 {
686   guint retval;
687   
688   G_LOCK (unused_threads);
689   retval = unused_threads;
690   G_UNLOCK (unused_threads);
691
692   return retval;
693 }
694
695 /**
696  * g_thread_pool_stop_unused_threads:
697  * 
698  * Stops all currently unused threads. This does not change the
699  * maximal number of unused threads. This function can be used to
700  * regularly stop all unused threads e.g. from g_timeout_add().
701  **/
702 void g_thread_pool_stop_unused_threads (void)
703
704   guint oldval = g_thread_pool_get_max_unused_threads ();
705   g_thread_pool_set_max_unused_threads (0);
706   g_thread_pool_set_max_unused_threads (oldval);
707 }
708
709 /**
710  * g_thread_pool_set_sort_function:
711  * @pool: a #GThreadPool
712  * @func: the #GCompareDataFunc used to sort the list of tasks. 
713  *     This function is passed two tasks. It should return
714  *     0 if the order in which they are handled does not matter, 
715  *     a negative value if the first task should be processed before
716  *     the second or a positive value if the second task should be 
717  *     processed first.
718  * @user_data: user data passed to @func.
719  *
720  * Sets the function used to sort the list of tasks. This allows the
721  * tasks to be processed by a priority determined by @func, and not
722  * just in the order in which they were added to the pool.
723  *
724  * Since: 2.10
725  **/
726 void g_thread_pool_set_sort_function (GThreadPool      *pool,
727                                       GCompareDataFunc  func,
728                                       gpointer          user_data)
729
730   GRealThreadPool *real = (GRealThreadPool*) pool;
731
732   g_return_if_fail (real);
733   g_return_if_fail (real->running);
734
735   g_async_queue_lock (real->queue);
736
737   real->sort_func = func;
738   real->sort_user_data = user_data;
739   
740   if (func) 
741     g_async_queue_sort_unlocked (real->queue, 
742                                  real->sort_func,
743                                  real->sort_user_data);
744
745   g_async_queue_unlock (real->queue);
746 }
747
748 #define __G_THREADPOOL_C__
749 #include "galiasdef.c"