added g_list_nth_prev() which walks ->prev instead of ->next.
[platform/upstream/glib.git] / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "glib.h"
28
29 typedef struct _GRealThreadPool GRealThreadPool;
30
31 struct _GRealThreadPool
32 {
33   GThreadPool pool;
34   GAsyncQueue* queue;
35   gint max_threads;
36   gint num_threads;
37   gboolean running;
38   gboolean immediate;
39   gboolean waiting;
40 };
41
42 /* The following is just an address to mark the stop order for a
43  * thread, it could be any address (as long, as it isn't a valid
44  * GThreadPool address) */
45 static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new;
46
47 /* Here all unused threads are waiting, depending on their priority */
48 static GAsyncQueue *unused_thread_queue[G_THREAD_PRIORITY_URGENT + 1][2];
49 static gint unused_threads = 0;
50 static gint max_unused_threads = 0;
51 G_LOCK_DEFINE_STATIC (unused_threads);
52
53 static GMutex *inform_mutex = NULL;
54 static GCond *inform_cond = NULL;
55
56 static void g_thread_pool_free_internal (GRealThreadPool* pool);
57 static void g_thread_pool_thread_proxy (gpointer data);
58 static void g_thread_pool_start_thread (GRealThreadPool* pool, GError **error);
59 static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool);
60
61 #define g_thread_should_run(pool, len) \
62   ((pool)->running || (!(pool)->immediate && (len) > 0))
63
64 static void 
65 g_thread_pool_thread_proxy (gpointer data)
66 {
67   GRealThreadPool *pool = data;
68
69   g_async_queue_lock (pool->queue);
70   while (TRUE)
71     {
72       gpointer task; 
73       gboolean goto_global_pool = 
74         !pool->pool.exclusive && pool->pool.stack_size == 0;
75       gint len = g_async_queue_length_unlocked (pool->queue);
76       
77       if (g_thread_should_run (pool, len))
78         {
79           task = g_async_queue_pop_unlocked (pool->queue);
80
81           if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
82             /* We are in fact a superfluous threads, so we go to the
83              * global pool and just hand the data further to the next one
84              * waiting in the queue */
85             {
86               g_async_queue_push_unlocked (pool->queue, task);
87               goto_global_pool = TRUE;
88             }
89           else if (pool->running || !pool->immediate)
90             {
91               g_async_queue_unlock (pool->queue);
92               pool->pool.thread_func (task, pool->pool.user_data);
93               g_async_queue_lock (pool->queue);
94             }
95
96           len = g_async_queue_length_unlocked (pool->queue);
97         }
98
99       if (!g_thread_should_run (pool, len))
100         {
101           g_cond_broadcast (inform_cond);
102           goto_global_pool = TRUE;
103         }
104       else if (len >= 0)
105         /* At this pool there is no thread waiting */
106         goto_global_pool = FALSE; 
107       
108       if (goto_global_pool)
109         {
110           GAsyncQueue *unused_queue = 
111             unused_thread_queue[pool->pool.priority][pool->pool.bound ? 1 : 0];
112           pool->num_threads--; 
113
114           if (!pool->running && !pool->waiting)
115             {
116               if (pool->num_threads == 0)
117                 {
118                   g_async_queue_unlock (pool->queue);
119                   g_thread_pool_free_internal (pool);
120                 }               
121               else if (len == - pool->num_threads)
122                 {
123                   g_thread_pool_wakeup_and_stop_all (pool);
124                   g_async_queue_unlock (pool->queue);
125                 }
126             }
127           else
128             g_async_queue_unlock (pool->queue);
129           
130           g_async_queue_lock (unused_queue);
131
132           G_LOCK (unused_threads);
133           if ((unused_threads >= max_unused_threads && 
134                max_unused_threads != -1) || pool->pool.stack_size != 0)
135             {
136               G_UNLOCK (unused_threads);
137               g_async_queue_unlock (unused_queue);
138               /* Stop this thread */
139               return;      
140             }
141           unused_threads++;
142           G_UNLOCK (unused_threads);
143
144           pool = g_async_queue_pop_unlocked (unused_queue);
145
146           G_LOCK (unused_threads);
147           unused_threads--;
148           G_UNLOCK (unused_threads);
149
150           g_async_queue_unlock (unused_queue);
151           
152           if (pool == stop_this_thread_marker)
153             /* Stop this thread */
154             return;
155           
156           g_async_queue_lock (pool->queue);
157
158           /* pool->num_threads++ is not done here, but in
159            * g_thread_pool_start_thread to make the new started thread
160            * known to the pool, before itself can do it. */
161         }
162     }
163 }
164
165 static void
166 g_thread_pool_start_thread (GRealThreadPool  *pool, 
167                             GError          **error)
168 {
169   gboolean success = FALSE;
170   GThreadPriority priority = pool->pool.priority;
171   guint bound = pool->pool.bound ? 1 : 0;
172   GAsyncQueue *queue = unused_thread_queue[priority][bound];
173   
174   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
175     /* Enough threads are already running */
176     return;
177
178   g_async_queue_lock (queue);
179
180   if (g_async_queue_length_unlocked (queue) < 0)
181     {
182       /* First we try a thread with the right priority */
183       g_async_queue_push_unlocked (queue, pool);
184       success = TRUE;
185     }
186
187   g_async_queue_unlock (queue);
188
189   /* We will not search for threads with other priorities, because changing
190    * priority is quite unportable */
191   
192   if (!success)
193     {
194       GError *local_error = NULL;
195       /* No thread was found, we have to start a new one */
196       g_thread_create (g_thread_pool_thread_proxy, pool, 
197                        pool->pool.stack_size, FALSE, 
198                        bound, priority, &local_error);
199       
200       if (local_error)
201         {
202           g_propagate_error (error, local_error);
203           return;
204         }
205     }
206
207   /* See comment in g_thread_pool_thread_proxy as to why this is done
208    * here and not there */
209   pool->num_threads++;
210 }
211
212 /**
213  * g_thread_pool_new: 
214  * @thread_func: a function to execute in the threads of the new thread pool
215  * @max_threads: the maximal number of threads to execute concurrently in 
216  *   the new thread pool, -1 means no limit
217  * @stack_size: the stack size for the threads of the new thread pool,
218  *   0 means using the standard
219  * @bound: should the threads of the new thread pool be bound?
220  * @priority: a priority for the threads of the new thread pool
221  * @exclusive: should this thread pool be exclusive?
222  * @user_data: user data that is handed over to @thread_func every time it 
223  *   is called
224  * @error: return location for error
225  *
226  * This function creates a new thread pool. All threads created within
227  * this thread pool will have the priority @priority and the stack
228  * size @stack_size and will be bound if and only if @bound is
229  * true. 
230  *
231  * Whenever you call g_thread_pool_push(), either a new thread is
232  * created or an unused one is reused. At most @max_threads threads
233  * are running concurrently for this thread pool. @max_threads = -1
234  * allows unlimited threads to be created for this thread pool. The
235  * newly created or reused thread now executes the function
236  * @thread_func with the two arguments. The first one is the parameter
237  * to g_thread_pool_push() and the second one is @user_data.
238  *
239  * The parameter @exclusive determines, whether the thread pool owns
240  * all threads exclusive or whether the threads are shared
241  * globally. If @exclusive is @TRUE, @max_threads threads are started
242  * immediately and they will run exclusively for this thread pool until
243  * it is destroyed by g_thread_pool_free(). If @exclusive is @FALSE,
244  * threads are created, when needed and shared between all
245  * non-exclusive thread pools. This implies that @max_threads may not
246  * be -1 for exclusive thread pools.
247  *
248  * Note, that only threads from a thread pool with a @stack_size of 0
249  * (which means using the standard stack size) will be globally
250  * reused. Threads from a thread pool with a non-zero stack size will
251  * stay only in this thread pool until it is freed and can thus not be
252  * controlled by the g_thread_pool_set_unused_threads() function.
253  *
254  * @error can be NULL to ignore errors, or non-NULL to report
255  * errors. An error can only occur, when @exclusive is set to @TRUE and
256  * not all @max_threads threads could be created.
257  *
258  * Return value: the new #GThreadPool
259  **/
260 GThreadPool* 
261 g_thread_pool_new (GFunc            thread_func,
262                    gint             max_threads,
263                    gulong           stack_size,
264                    gboolean         bound,
265                    GThreadPriority  priority,
266                    gboolean         exclusive,
267                    gpointer         user_data,
268                    GError         **error)
269 {
270   GRealThreadPool *retval;
271   G_LOCK_DEFINE_STATIC (init);
272
273   g_return_val_if_fail (thread_func, NULL);
274   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
275   g_return_val_if_fail (max_threads >= -1, NULL);
276   g_return_val_if_fail (g_thread_supported (), NULL);
277
278   retval = g_new (GRealThreadPool, 1);
279
280   retval->pool.thread_func = thread_func;
281   retval->pool.stack_size = stack_size;
282   retval->pool.bound = bound;
283   retval->pool.priority = priority;
284   retval->pool.exclusive = exclusive;
285   retval->pool.user_data = user_data;
286   retval->queue = g_async_queue_new ();
287   retval->max_threads = max_threads;
288   retval->num_threads = 0;
289   retval->running = TRUE;
290
291   G_LOCK (init);
292   
293   if (!inform_mutex)
294     {
295       inform_mutex = g_mutex_new ();
296       inform_cond = g_cond_new ();
297       for (priority = G_THREAD_PRIORITY_LOW; 
298            priority < G_THREAD_PRIORITY_URGENT + 1; priority++)
299         {
300           unused_thread_queue[priority][0] = g_async_queue_new ();
301           unused_thread_queue[priority][1] = g_async_queue_new ();
302         }
303     }
304
305   G_UNLOCK (init);
306
307   if (retval->pool.exclusive)
308     {
309       g_async_queue_lock (retval->queue);
310   
311       while (retval->num_threads < retval->max_threads)
312         {
313           GError *local_error = NULL;
314           g_thread_pool_start_thread (retval, &local_error);
315           if (local_error)
316             {
317               g_propagate_error (error, local_error);
318               break;
319             }
320         }
321
322       g_async_queue_unlock (retval->queue);
323     }
324
325   return (GThreadPool*) retval;
326 }
327
328 /**
329  * g_thread_pool_push:
330  * @pool: a #GThreadPool
331  * @data: a new task for @pool
332  * @error: return location for error
333  * 
334  * Inserts @data into the list of tasks to be executed by @pool. When
335  * the number of currently running threads is lower than the maximal
336  * allowed number of threads, a new thread is started (or reused) with
337  * the properties given to g_thread_pool_new (). Otherwise @data stays
338  * in the queue until a thread in this pool finishes its previous task
339  * and processes @data. 
340  *
341  * @error can be NULL to ignore errors, or non-NULL to report
342  * errors. An error can only occur, when a new thread couldn't be
343  * created. In that case @data is simply appended to the queue of work
344  * to do.  
345  **/
346 void 
347 g_thread_pool_push (GThreadPool     *pool,
348                     gpointer         data,
349                     GError         **error)
350 {
351   GRealThreadPool *real = (GRealThreadPool*) pool;
352
353   g_return_if_fail (real);
354
355   g_async_queue_lock (real->queue);
356   
357   if (!real->running)
358     {
359       g_async_queue_unlock (real->queue);
360       g_return_if_fail (real->running);
361     }
362
363   if (g_async_queue_length_unlocked (real->queue) >= 0)
364     /* No thread is waiting in the queue */
365     g_thread_pool_start_thread (real, error);
366
367   g_async_queue_push_unlocked (real->queue, data);
368   g_async_queue_unlock (real->queue);
369 }
370
371 /**
372  * g_thread_pool_set_max_threads:
373  * @pool: a #GThreadPool
374  * @max_threads: a new maximal number of threads for @pool
375  * @error: return location for error
376  * 
377  * Sets the maximal allowed number of threads for @pool. A value of -1
378  * means, that the maximal number of threads is unlimited.
379  *
380  * Setting @max_threads to 0 means stopping all work for @pool. It is
381  * effectively frozen until @max_threads is set to a non-zero value
382  * again.
383  * 
384  * A thread is never terminated while calling @thread_func, as
385  * supplied by g_thread_pool_new (). Instead the maximal number of
386  * threads only has effect for the allocation of new threads in
387  * g_thread_pool_push (). A new thread is allocated, whenever the
388  * number of currently running threads in @pool is smaller than the
389  * maximal number.
390  *
391  * @error can be NULL to ignore errors, or non-NULL to report
392  * errors. An error can only occur, when a new thread couldn't be
393  * created. 
394  **/
395 void
396 g_thread_pool_set_max_threads (GThreadPool     *pool,
397                                gint             max_threads,
398                                GError         **error)
399 {
400   GRealThreadPool *real = (GRealThreadPool*) pool;
401   gint to_start;
402
403   g_return_if_fail (real);
404   g_return_if_fail (real->running);
405   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
406   g_return_if_fail (max_threads >= -1);
407
408   g_async_queue_lock (real->queue);
409
410   real->max_threads = max_threads;
411   
412   if (pool->exclusive)
413     to_start = real->max_threads - real->num_threads;
414   else
415     to_start = g_async_queue_length_unlocked (real->queue);
416   
417   for ( ; to_start > 0; to_start--)
418     {
419       GError *local_error = NULL;
420       g_thread_pool_start_thread (real, &local_error);
421       if (local_error)
422         {
423           g_propagate_error (error, local_error);
424           break;
425         }
426     }
427    
428   g_async_queue_unlock (real->queue);
429 }
430
431 /**
432  * g_thread_pool_get_max_threads:
433  * @pool: a #GThreadPool
434  *
435  * Returns the maximal number of threads for @pool.
436  *
437  * Return value: the maximal number of threads
438  **/
439 gint
440 g_thread_pool_get_max_threads (GThreadPool     *pool)
441 {
442   GRealThreadPool *real = (GRealThreadPool*) pool;
443   gint retval;
444
445   g_return_val_if_fail (real, 0);
446   g_return_val_if_fail (real->running, 0);
447
448   g_async_queue_lock (real->queue);
449
450   retval = real->max_threads;
451     
452   g_async_queue_unlock (real->queue);
453
454   return retval;
455 }
456
457 /**
458  * g_thread_pool_get_num_threads:
459  * @pool: a #GThreadPool
460  *
461  * Returns the number of threads currently running in @pool.
462  *
463  * Return value: the number of threads currently running
464  **/
465 guint
466 g_thread_pool_get_num_threads (GThreadPool     *pool)
467 {
468   GRealThreadPool *real = (GRealThreadPool*) pool;
469   guint retval;
470
471   g_return_val_if_fail (real, 0);
472   g_return_val_if_fail (real->running, 0);
473
474   g_async_queue_lock (real->queue);
475
476   retval = real->num_threads;
477     
478   g_async_queue_unlock (real->queue);
479
480   return retval;
481 }
482
483 /**
484  * g_thread_pool_unprocessed:
485  * @pool: a #GThreadPool
486  *
487  * Returns the number of tasks still unprocessed in @pool.
488  *
489  * Return value: the number of unprocessed tasks
490  **/
491 guint
492 g_thread_pool_unprocessed (GThreadPool     *pool)
493 {
494   GRealThreadPool *real = (GRealThreadPool*) pool;
495   gint unprocessed;
496
497   g_return_val_if_fail (real, 0);
498   g_return_val_if_fail (real->running, 0);
499
500   unprocessed = g_async_queue_length (real->queue);
501
502   return MAX (unprocessed, 0);
503 }
504
505 /**
506  * g_thread_pool_free:
507  * @pool: a #GThreadPool
508  * @immediate: should @pool shut down immediately?
509  * @wait: should the function wait for all tasks to be finished?
510  *
511  * Frees all resources allocated for @pool.
512  *
513  * If @immediate is #TRUE, no new task is processed for
514  * @pool. Otherwise @pool is not freed before the last task is
515  * processed. Note however, that no thread of this pool is
516  * interrupted, while processing a task. Instead at least all still
517  * running threads can finish their tasks before the @pool is freed.
518  *
519  * If @wait is #TRUE, the functions does not return before all tasks
520  * to be processed (dependent on @immediate, whether all or only the
521  * currently running) are ready. Otherwise the function returns immediately.
522  *
523  * After calling this function @pool must not be used anymore. 
524  **/
525 void
526 g_thread_pool_free (GThreadPool     *pool,
527                     gboolean         immediate,
528                     gboolean         wait)
529 {
530   GRealThreadPool *real = (GRealThreadPool*) pool;
531
532   g_return_if_fail (real);
533   g_return_if_fail (real->running);
534   /* It there's no thread allowed here, there is not much sense in
535    * not stopping this pool immediately, when it's not empty */
536   g_return_if_fail (immediate || real->max_threads != 0 || 
537                     g_async_queue_length (real->queue) == 0);
538
539   g_async_queue_lock (real->queue);
540
541   real->running = FALSE;
542   real->immediate = immediate;
543   real->waiting = wait;
544
545   if (wait)
546     {
547       g_mutex_lock (inform_mutex);
548       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads)
549         {
550           g_async_queue_unlock (real->queue); 
551           g_cond_wait (inform_cond, inform_mutex); 
552           g_async_queue_lock (real->queue); 
553         }
554       g_mutex_unlock (inform_mutex); 
555     }
556
557   if (g_async_queue_length_unlocked (real->queue) == -real->num_threads)
558     {
559       /* No thread is currently doing something (and nothing is left
560        * to process in the queue) */
561       if (real->num_threads == 0) /* No threads left, we clean up */
562         {
563           g_async_queue_unlock (real->queue);
564           g_thread_pool_free_internal (real);
565           return;
566         }
567
568       g_thread_pool_wakeup_and_stop_all (real);
569     }
570   
571   real->waiting = FALSE; /* The last thread should cleanup the pool */
572   g_async_queue_unlock (real->queue);
573 }
574
575 static void
576 g_thread_pool_free_internal (GRealThreadPool* pool)
577 {
578   g_return_if_fail (pool);
579   g_return_if_fail (!pool->running);
580   g_return_if_fail (pool->num_threads == 0);
581
582   g_async_queue_unref (pool->queue);
583
584   g_free (pool);
585 }
586
587 static void
588 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
589 {
590   guint i;
591   
592   g_return_if_fail (pool);
593   g_return_if_fail (!pool->running);
594   g_return_if_fail (pool->num_threads != 0);
595   g_return_if_fail (g_async_queue_length_unlocked (pool->queue) == 
596                     -pool->num_threads);
597
598   pool->immediate = TRUE; 
599   for (i = 0; i < pool->num_threads; i++)
600     g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
601 }
602
603 /**
604  * g_thread_pool_set_max_unused_threads:
605  * @max_threads: maximal number of unused threads
606  *
607  * Sets the maximal number of unused threads to @max_threads. If
608  * @max_threads is -1, no limit is imposed on the number of unused
609  * threads.
610  **/
611 void
612 g_thread_pool_set_max_unused_threads (gint max_threads)
613 {
614   g_return_if_fail (max_threads >= -1);  
615
616   G_LOCK (unused_threads);
617   
618   max_unused_threads = max_threads;
619
620   if (max_unused_threads < unused_threads && max_unused_threads != -1)
621     {
622       guint close_down_num = unused_threads - max_unused_threads;
623
624       while (close_down_num > 0)
625         {
626           GThreadPriority priority;
627           guint bound;
628
629           guint old_close_down_num = close_down_num;
630           for (priority = G_THREAD_PRIORITY_LOW; 
631                priority < G_THREAD_PRIORITY_URGENT + 1 && close_down_num > 0; 
632                priority++)
633             {
634               for (bound = 0; bound < 2; bound++)
635                 {
636                   GAsyncQueue *queue = unused_thread_queue[priority][bound];
637                   g_async_queue_lock (queue);
638                   
639                   if (g_async_queue_length_unlocked (queue) < 0)
640                     {
641                       g_async_queue_push_unlocked (queue, 
642                                                    stop_this_thread_marker);
643                       close_down_num--;
644                     }
645                   
646                   g_async_queue_unlock (queue);
647                 }
648             }
649
650           /* Just to make sure, there are no counting problems */
651           g_assert (old_close_down_num != close_down_num);
652         }
653     }
654     
655   G_UNLOCK (unused_threads);
656 }
657
658 /**
659  * g_thread_pool_get_max_unused_threads:
660  * 
661  * Returns the maximal allowed number of unused threads.
662  *
663  * Return value: the maximal number of unused threads
664  **/
665 gint
666 g_thread_pool_get_max_unused_threads (void)
667 {
668   gint retval;
669   
670   G_LOCK (unused_threads);
671   retval = max_unused_threads;
672   G_UNLOCK (unused_threads);
673
674   return retval;
675 }
676
677 /**
678  * g_thread_pool_get_num_unused_threads:
679  * 
680  * Returns the number of currently unused threads.
681  *
682  * Return value: the number of currently unused threads
683  **/
684 guint g_thread_pool_get_num_unused_threads (void)
685 {
686   guint retval;
687   
688   G_LOCK (unused_threads);
689   retval = unused_threads;
690   G_UNLOCK (unused_threads);
691
692   return retval;
693 }
694
695 /**
696  * g_thread_pool_stop_unused_threads:
697  * 
698  * Stops all currently unused threads. This does not change the
699  * maximal number of unused threads. This function can be used to
700  * regularly stop all unused threads e.g. from g_timeout_add().
701  **/
702 void g_thread_pool_stop_unused_threads (void)
703
704   guint oldval = g_thread_pool_get_max_unused_threads ();
705   g_thread_pool_set_max_unused_threads (0);
706   g_thread_pool_set_max_unused_threads (oldval);
707 }