af62d89b2bfc254f0bcc248394b5aa90d2be64b5
[platform/upstream/glib.git] / glib / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "glib.h"
28
29 typedef struct _GRealThreadPool GRealThreadPool;
30
31 struct _GRealThreadPool
32 {
33   GThreadPool pool;
34   GAsyncQueue* queue;
35   gint max_threads;
36   gint num_threads;
37   gboolean running;
38   gboolean immediate;
39   gboolean waiting;
40 };
41
42 /* The following is just an address to mark the stop order for a
43  * thread, it could be any address (as long, as it isn't a valid
44  * GThreadPool address) */
45 static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new;
46
47 /* Here all unused threads are waiting, depending on their priority */
48 static GAsyncQueue *unused_thread_queue[G_THREAD_PRIORITY_URGENT + 1][2];
49 static gint unused_threads = 0;
50 static gint max_unused_threads = 0;
51 G_LOCK_DEFINE_STATIC (unused_threads);
52
53 static GMutex *inform_mutex = NULL;
54 static GCond *inform_cond = NULL;
55
56 static void g_thread_pool_free_internal (GRealThreadPool* pool);
57 static void g_thread_pool_thread_proxy (gpointer data);
58 static void g_thread_pool_start_thread (GRealThreadPool* pool, GError **error);
59 static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool);
60
61 #define g_thread_should_run(pool, len) \
62   ((pool)->running || (!(pool)->immediate && (len) > 0))
63
64 static void 
65 g_thread_pool_thread_proxy (gpointer data)
66 {
67   GRealThreadPool *pool = data;
68   gboolean watcher = FALSE;
69
70   g_async_queue_lock (pool->queue);
71   while (TRUE)
72     {
73       gpointer task; 
74       gboolean goto_global_pool = 
75         !pool->pool.exclusive && pool->pool.stack_size == 0;
76       gint len = g_async_queue_length_unlocked (pool->queue);
77       
78       if (g_thread_should_run (pool, len))
79         {
80           if (watcher)
81             {
82               /* This thread is actually not needed here, but it waits
83                * for some time anyway. If during that time a new
84                * request arrives, this saves process
85                * swicthes. Otherwise the thread will go to the global
86                * pool afterwards */
87               GTimeVal end_time;
88               g_get_current_time (&end_time);
89               end_time.tv_usec += G_USEC_PER_SEC / 2; /* Halv a second */
90               if (end_time.tv_usec >= G_USEC_PER_SEC)
91                 {
92                   end_time.tv_usec -= G_USEC_PER_SEC;
93                   end_time.tv_sec += 1;
94                 }
95          
96               task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
97             }
98           else
99             {
100               task = g_async_queue_pop_unlocked (pool->queue);
101             }
102
103           if (task)
104             {
105               watcher = FALSE;
106               if (pool->num_threads > pool->max_threads && 
107                   pool->max_threads != -1)
108                 /* We are in fact a superfluous threads, so we go to
109                  * the global pool and just hand the data further to
110                  * the next one waiting in the queue */
111                 {
112                   g_async_queue_push_unlocked (pool->queue, task);
113                   goto_global_pool = TRUE;
114                 }
115               else if (pool->running || !pool->immediate)
116                 {
117                   g_async_queue_unlock (pool->queue);
118                   pool->pool.thread_func (task, pool->pool.user_data);
119                   g_async_queue_lock (pool->queue);
120                 }
121             }
122           len = g_async_queue_length_unlocked (pool->queue);
123         }
124
125       if (!g_thread_should_run (pool, len))
126         {
127           g_cond_broadcast (inform_cond);
128           goto_global_pool = TRUE;
129         }
130       else if (len > 0)
131         {
132           /* At this pool there are no threads waiting, but tasks are. */
133           goto_global_pool = FALSE; 
134         }
135       else if (len == 0 && !watcher && !pool->pool.exclusive)
136         {
137           /* Here neither threads nor tasks are queued and we didn't
138            * just return from a timed wait. We now wait for a limited
139            * time at this pool for new tasks to avoid costly context
140            * switches. */
141           goto_global_pool = FALSE;
142           watcher = TRUE;
143         }
144
145       
146       if (goto_global_pool)
147         {
148           GAsyncQueue *unused_queue = 
149             unused_thread_queue[pool->pool.priority][pool->pool.bound ? 1 : 0];
150           pool->num_threads--; 
151
152           if (!pool->running && !pool->waiting)
153             {
154               if (pool->num_threads == 0)
155                 {
156                   g_async_queue_unlock (pool->queue);
157                   g_thread_pool_free_internal (pool);
158                 }               
159               else if (len == - pool->num_threads)
160                 {
161                   g_thread_pool_wakeup_and_stop_all (pool);
162                   g_async_queue_unlock (pool->queue);
163                 }
164             }
165           else
166             g_async_queue_unlock (pool->queue);
167           
168           g_async_queue_lock (unused_queue);
169
170           G_LOCK (unused_threads);
171           if ((unused_threads >= max_unused_threads && 
172                max_unused_threads != -1) || pool->pool.stack_size != 0)
173             {
174               G_UNLOCK (unused_threads);
175               g_async_queue_unlock (unused_queue);
176               /* Stop this thread */
177               return;      
178             }
179           unused_threads++;
180           G_UNLOCK (unused_threads);
181
182           pool = g_async_queue_pop_unlocked (unused_queue);
183
184           G_LOCK (unused_threads);
185           unused_threads--;
186           G_UNLOCK (unused_threads);
187
188           g_async_queue_unlock (unused_queue);
189           
190           if (pool == stop_this_thread_marker)
191             /* Stop this thread */
192             return;
193           
194           g_async_queue_lock (pool->queue);
195
196           /* pool->num_threads++ is not done here, but in
197            * g_thread_pool_start_thread to make the new started thread
198            * known to the pool, before itself can do it. */
199         }
200     }
201 }
202
203 static void
204 g_thread_pool_start_thread (GRealThreadPool  *pool, 
205                             GError          **error)
206 {
207   gboolean success = FALSE;
208   GThreadPriority priority = pool->pool.priority;
209   guint bound = pool->pool.bound ? 1 : 0;
210   GAsyncQueue *queue = unused_thread_queue[priority][bound];
211   
212   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
213     /* Enough threads are already running */
214     return;
215
216   g_async_queue_lock (queue);
217
218   if (g_async_queue_length_unlocked (queue) < 0)
219     {
220       /* First we try a thread with the right priority */
221       g_async_queue_push_unlocked (queue, pool);
222       success = TRUE;
223     }
224
225   g_async_queue_unlock (queue);
226
227   /* We will not search for threads with other priorities, because changing
228    * priority is quite unportable */
229   
230   if (!success)
231     {
232       GError *local_error = NULL;
233       /* No thread was found, we have to start a new one */
234       g_thread_create (g_thread_pool_thread_proxy, pool, 
235                        pool->pool.stack_size, FALSE, 
236                        bound, priority, &local_error);
237       
238       if (local_error)
239         {
240           g_propagate_error (error, local_error);
241           return;
242         }
243     }
244
245   /* See comment in g_thread_pool_thread_proxy as to why this is done
246    * here and not there */
247   pool->num_threads++;
248 }
249
250 /**
251  * g_thread_pool_new: 
252  * @thread_func: a function to execute in the threads of the new thread pool
253  * @max_threads: the maximal number of threads to execute concurrently in 
254  *   the new thread pool, -1 means no limit
255  * @stack_size: the stack size for the threads of the new thread pool,
256  *   0 means using the standard
257  * @bound: should the threads of the new thread pool be bound?
258  * @priority: a priority for the threads of the new thread pool
259  * @exclusive: should this thread pool be exclusive?
260  * @user_data: user data that is handed over to @thread_func every time it 
261  *   is called
262  * @error: return location for error
263  *
264  * This function creates a new thread pool. All threads created within
265  * this thread pool will have the priority @priority and the stack
266  * size @stack_size and will be bound if and only if @bound is
267  * true. 
268  *
269  * Whenever you call g_thread_pool_push(), either a new thread is
270  * created or an unused one is reused. At most @max_threads threads
271  * are running concurrently for this thread pool. @max_threads = -1
272  * allows unlimited threads to be created for this thread pool. The
273  * newly created or reused thread now executes the function
274  * @thread_func with the two arguments. The first one is the parameter
275  * to g_thread_pool_push() and the second one is @user_data.
276  *
277  * The parameter @exclusive determines, whether the thread pool owns
278  * all threads exclusive or whether the threads are shared
279  * globally. If @exclusive is @TRUE, @max_threads threads are started
280  * immediately and they will run exclusively for this thread pool until
281  * it is destroyed by g_thread_pool_free(). If @exclusive is @FALSE,
282  * threads are created, when needed and shared between all
283  * non-exclusive thread pools. This implies that @max_threads may not
284  * be -1 for exclusive thread pools.
285  *
286  * Note, that only threads from a thread pool with a @stack_size of 0
287  * (which means using the standard stack size) will be globally
288  * reused. Threads from a thread pool with a non-zero stack size will
289  * stay only in this thread pool until it is freed and can thus not be
290  * controlled by the g_thread_pool_set_unused_threads() function.
291  *
292  * @error can be NULL to ignore errors, or non-NULL to report
293  * errors. An error can only occur, when @exclusive is set to @TRUE and
294  * not all @max_threads threads could be created.
295  *
296  * Return value: the new #GThreadPool
297  **/
298 GThreadPool* 
299 g_thread_pool_new (GFunc            thread_func,
300                    gint             max_threads,
301                    gulong           stack_size,
302                    gboolean         bound,
303                    GThreadPriority  priority,
304                    gboolean         exclusive,
305                    gpointer         user_data,
306                    GError         **error)
307 {
308   GRealThreadPool *retval;
309   G_LOCK_DEFINE_STATIC (init);
310
311   g_return_val_if_fail (thread_func, NULL);
312   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
313   g_return_val_if_fail (max_threads >= -1, NULL);
314   g_return_val_if_fail (g_thread_supported (), NULL);
315
316   retval = g_new (GRealThreadPool, 1);
317
318   retval->pool.thread_func = thread_func;
319   retval->pool.stack_size = stack_size;
320   retval->pool.bound = bound;
321   retval->pool.priority = priority;
322   retval->pool.exclusive = exclusive;
323   retval->pool.user_data = user_data;
324   retval->queue = g_async_queue_new ();
325   retval->max_threads = max_threads;
326   retval->num_threads = 0;
327   retval->running = TRUE;
328
329   G_LOCK (init);
330   
331   if (!inform_mutex)
332     {
333       inform_mutex = g_mutex_new ();
334       inform_cond = g_cond_new ();
335       for (priority = G_THREAD_PRIORITY_LOW; 
336            priority < G_THREAD_PRIORITY_URGENT + 1; priority++)
337         {
338           unused_thread_queue[priority][0] = g_async_queue_new ();
339           unused_thread_queue[priority][1] = g_async_queue_new ();
340         }
341     }
342
343   G_UNLOCK (init);
344
345   if (retval->pool.exclusive)
346     {
347       g_async_queue_lock (retval->queue);
348   
349       while (retval->num_threads < retval->max_threads)
350         {
351           GError *local_error = NULL;
352           g_thread_pool_start_thread (retval, &local_error);
353           if (local_error)
354             {
355               g_propagate_error (error, local_error);
356               break;
357             }
358         }
359
360       g_async_queue_unlock (retval->queue);
361     }
362
363   return (GThreadPool*) retval;
364 }
365
366 /**
367  * g_thread_pool_push:
368  * @pool: a #GThreadPool
369  * @data: a new task for @pool
370  * @error: return location for error
371  * 
372  * Inserts @data into the list of tasks to be executed by @pool. When
373  * the number of currently running threads is lower than the maximal
374  * allowed number of threads, a new thread is started (or reused) with
375  * the properties given to g_thread_pool_new (). Otherwise @data stays
376  * in the queue until a thread in this pool finishes its previous task
377  * and processes @data. 
378  *
379  * @error can be NULL to ignore errors, or non-NULL to report
380  * errors. An error can only occur, when a new thread couldn't be
381  * created. In that case @data is simply appended to the queue of work
382  * to do.  
383  **/
384 void 
385 g_thread_pool_push (GThreadPool     *pool,
386                     gpointer         data,
387                     GError         **error)
388 {
389   GRealThreadPool *real = (GRealThreadPool*) pool;
390
391   g_return_if_fail (real);
392
393   g_async_queue_lock (real->queue);
394   
395   if (!real->running)
396     {
397       g_async_queue_unlock (real->queue);
398       g_return_if_fail (real->running);
399     }
400
401   if (g_async_queue_length_unlocked (real->queue) >= 0)
402     /* No thread is waiting in the queue */
403     g_thread_pool_start_thread (real, error);
404
405   g_async_queue_push_unlocked (real->queue, data);
406   g_async_queue_unlock (real->queue);
407 }
408
409 /**
410  * g_thread_pool_set_max_threads:
411  * @pool: a #GThreadPool
412  * @max_threads: a new maximal number of threads for @pool
413  * @error: return location for error
414  * 
415  * Sets the maximal allowed number of threads for @pool. A value of -1
416  * means, that the maximal number of threads is unlimited.
417  *
418  * Setting @max_threads to 0 means stopping all work for @pool. It is
419  * effectively frozen until @max_threads is set to a non-zero value
420  * again.
421  * 
422  * A thread is never terminated while calling @thread_func, as
423  * supplied by g_thread_pool_new (). Instead the maximal number of
424  * threads only has effect for the allocation of new threads in
425  * g_thread_pool_push (). A new thread is allocated, whenever the
426  * number of currently running threads in @pool is smaller than the
427  * maximal number.
428  *
429  * @error can be NULL to ignore errors, or non-NULL to report
430  * errors. An error can only occur, when a new thread couldn't be
431  * created. 
432  **/
433 void
434 g_thread_pool_set_max_threads (GThreadPool     *pool,
435                                gint             max_threads,
436                                GError         **error)
437 {
438   GRealThreadPool *real = (GRealThreadPool*) pool;
439   gint to_start;
440
441   g_return_if_fail (real);
442   g_return_if_fail (real->running);
443   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
444   g_return_if_fail (max_threads >= -1);
445
446   g_async_queue_lock (real->queue);
447
448   real->max_threads = max_threads;
449   
450   if (pool->exclusive)
451     to_start = real->max_threads - real->num_threads;
452   else
453     to_start = g_async_queue_length_unlocked (real->queue);
454   
455   for ( ; to_start > 0; to_start--)
456     {
457       GError *local_error = NULL;
458       g_thread_pool_start_thread (real, &local_error);
459       if (local_error)
460         {
461           g_propagate_error (error, local_error);
462           break;
463         }
464     }
465    
466   g_async_queue_unlock (real->queue);
467 }
468
469 /**
470  * g_thread_pool_get_max_threads:
471  * @pool: a #GThreadPool
472  *
473  * Returns the maximal number of threads for @pool.
474  *
475  * Return value: the maximal number of threads
476  **/
477 gint
478 g_thread_pool_get_max_threads (GThreadPool     *pool)
479 {
480   GRealThreadPool *real = (GRealThreadPool*) pool;
481   gint retval;
482
483   g_return_val_if_fail (real, 0);
484   g_return_val_if_fail (real->running, 0);
485
486   g_async_queue_lock (real->queue);
487
488   retval = real->max_threads;
489     
490   g_async_queue_unlock (real->queue);
491
492   return retval;
493 }
494
495 /**
496  * g_thread_pool_get_num_threads:
497  * @pool: a #GThreadPool
498  *
499  * Returns the number of threads currently running in @pool.
500  *
501  * Return value: the number of threads currently running
502  **/
503 guint
504 g_thread_pool_get_num_threads (GThreadPool     *pool)
505 {
506   GRealThreadPool *real = (GRealThreadPool*) pool;
507   guint retval;
508
509   g_return_val_if_fail (real, 0);
510   g_return_val_if_fail (real->running, 0);
511
512   g_async_queue_lock (real->queue);
513
514   retval = real->num_threads;
515     
516   g_async_queue_unlock (real->queue);
517
518   return retval;
519 }
520
521 /**
522  * g_thread_pool_unprocessed:
523  * @pool: a #GThreadPool
524  *
525  * Returns the number of tasks still unprocessed in @pool.
526  *
527  * Return value: the number of unprocessed tasks
528  **/
529 guint
530 g_thread_pool_unprocessed (GThreadPool     *pool)
531 {
532   GRealThreadPool *real = (GRealThreadPool*) pool;
533   gint unprocessed;
534
535   g_return_val_if_fail (real, 0);
536   g_return_val_if_fail (real->running, 0);
537
538   unprocessed = g_async_queue_length (real->queue);
539
540   return MAX (unprocessed, 0);
541 }
542
543 /**
544  * g_thread_pool_free:
545  * @pool: a #GThreadPool
546  * @immediate: should @pool shut down immediately?
547  * @wait: should the function wait for all tasks to be finished?
548  *
549  * Frees all resources allocated for @pool.
550  *
551  * If @immediate is #TRUE, no new task is processed for
552  * @pool. Otherwise @pool is not freed before the last task is
553  * processed. Note however, that no thread of this pool is
554  * interrupted, while processing a task. Instead at least all still
555  * running threads can finish their tasks before the @pool is freed.
556  *
557  * If @wait is #TRUE, the functions does not return before all tasks
558  * to be processed (dependent on @immediate, whether all or only the
559  * currently running) are ready. Otherwise the function returns immediately.
560  *
561  * After calling this function @pool must not be used anymore. 
562  **/
563 void
564 g_thread_pool_free (GThreadPool     *pool,
565                     gboolean         immediate,
566                     gboolean         wait)
567 {
568   GRealThreadPool *real = (GRealThreadPool*) pool;
569
570   g_return_if_fail (real);
571   g_return_if_fail (real->running);
572   /* It there's no thread allowed here, there is not much sense in
573    * not stopping this pool immediately, when it's not empty */
574   g_return_if_fail (immediate || real->max_threads != 0 || 
575                     g_async_queue_length (real->queue) == 0);
576
577   g_async_queue_lock (real->queue);
578
579   real->running = FALSE;
580   real->immediate = immediate;
581   real->waiting = wait;
582
583   if (wait)
584     {
585       g_mutex_lock (inform_mutex);
586       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads)
587         {
588           g_async_queue_unlock (real->queue); 
589           g_cond_wait (inform_cond, inform_mutex); 
590           g_async_queue_lock (real->queue); 
591         }
592       g_mutex_unlock (inform_mutex); 
593     }
594
595   if (g_async_queue_length_unlocked (real->queue) == -real->num_threads)
596     {
597       /* No thread is currently doing something (and nothing is left
598        * to process in the queue) */
599       if (real->num_threads == 0) /* No threads left, we clean up */
600         {
601           g_async_queue_unlock (real->queue);
602           g_thread_pool_free_internal (real);
603           return;
604         }
605
606       g_thread_pool_wakeup_and_stop_all (real);
607     }
608   
609   real->waiting = FALSE; /* The last thread should cleanup the pool */
610   g_async_queue_unlock (real->queue);
611 }
612
613 static void
614 g_thread_pool_free_internal (GRealThreadPool* pool)
615 {
616   g_return_if_fail (pool);
617   g_return_if_fail (!pool->running);
618   g_return_if_fail (pool->num_threads == 0);
619
620   g_async_queue_unref (pool->queue);
621
622   g_free (pool);
623 }
624
625 static void
626 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
627 {
628   guint i;
629   
630   g_return_if_fail (pool);
631   g_return_if_fail (!pool->running);
632   g_return_if_fail (pool->num_threads != 0);
633   g_return_if_fail (g_async_queue_length_unlocked (pool->queue) == 
634                     -pool->num_threads);
635
636   pool->immediate = TRUE; 
637   for (i = 0; i < pool->num_threads; i++)
638     g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
639 }
640
641 /**
642  * g_thread_pool_set_max_unused_threads:
643  * @max_threads: maximal number of unused threads
644  *
645  * Sets the maximal number of unused threads to @max_threads. If
646  * @max_threads is -1, no limit is imposed on the number of unused
647  * threads.
648  **/
649 void
650 g_thread_pool_set_max_unused_threads (gint max_threads)
651 {
652   g_return_if_fail (max_threads >= -1);  
653
654   G_LOCK (unused_threads);
655   
656   max_unused_threads = max_threads;
657
658   if (max_unused_threads < unused_threads && max_unused_threads != -1)
659     {
660       guint close_down_num = unused_threads - max_unused_threads;
661
662       while (close_down_num > 0)
663         {
664           GThreadPriority priority;
665           guint bound;
666
667           guint old_close_down_num = close_down_num;
668           for (priority = G_THREAD_PRIORITY_LOW; 
669                priority < G_THREAD_PRIORITY_URGENT + 1 && close_down_num > 0; 
670                priority++)
671             {
672               for (bound = 0; bound < 2; bound++)
673                 {
674                   GAsyncQueue *queue = unused_thread_queue[priority][bound];
675                   g_async_queue_lock (queue);
676                   
677                   if (g_async_queue_length_unlocked (queue) < 0)
678                     {
679                       g_async_queue_push_unlocked (queue, 
680                                                    stop_this_thread_marker);
681                       close_down_num--;
682                     }
683                   
684                   g_async_queue_unlock (queue);
685                 }
686             }
687
688           /* Just to make sure, there are no counting problems */
689           g_assert (old_close_down_num != close_down_num);
690         }
691     }
692     
693   G_UNLOCK (unused_threads);
694 }
695
696 /**
697  * g_thread_pool_get_max_unused_threads:
698  * 
699  * Returns the maximal allowed number of unused threads.
700  *
701  * Return value: the maximal number of unused threads
702  **/
703 gint
704 g_thread_pool_get_max_unused_threads (void)
705 {
706   gint retval;
707   
708   G_LOCK (unused_threads);
709   retval = max_unused_threads;
710   G_UNLOCK (unused_threads);
711
712   return retval;
713 }
714
715 /**
716  * g_thread_pool_get_num_unused_threads:
717  * 
718  * Returns the number of currently unused threads.
719  *
720  * Return value: the number of currently unused threads
721  **/
722 guint g_thread_pool_get_num_unused_threads (void)
723 {
724   guint retval;
725   
726   G_LOCK (unused_threads);
727   retval = unused_threads;
728   G_UNLOCK (unused_threads);
729
730   return retval;
731 }
732
733 /**
734  * g_thread_pool_stop_unused_threads:
735  * 
736  * Stops all currently unused threads. This does not change the
737  * maximal number of unused threads. This function can be used to
738  * regularly stop all unused threads e.g. from g_timeout_add().
739  **/
740 void g_thread_pool_stop_unused_threads (void)
741
742   guint oldval = g_thread_pool_get_max_unused_threads ();
743   g_thread_pool_set_max_unused_threads (0);
744   g_thread_pool_set_max_unused_threads (oldval);
745 }