Renamed g_thread_create to g_thread_create_full and added macro
[platform/upstream/glib.git] / glib / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "glib.h"
28
29 typedef struct _GRealThreadPool GRealThreadPool;
30
31 struct _GRealThreadPool
32 {
33   GThreadPool pool;
34   GAsyncQueue* queue;
35   gint max_threads;
36   gint num_threads;
37   gboolean running;
38   gboolean immediate;
39   gboolean waiting;
40 };
41
42 /* The following is just an address to mark the stop order for a
43  * thread, it could be any address (as long, as it isn't a valid
44  * GThreadPool address) */
45 static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new;
46
47 /* Here all unused threads are waiting  */
48 static GAsyncQueue *unused_thread_queue;
49 static gint unused_threads = 0;
50 static gint max_unused_threads = 0;
51 G_LOCK_DEFINE_STATIC (unused_threads);
52
53 static GMutex *inform_mutex = NULL;
54 static GCond *inform_cond = NULL;
55
56 static void     g_thread_pool_free_internal (GRealThreadPool* pool);
57 static gpointer g_thread_pool_thread_proxy (gpointer data);
58 static void     g_thread_pool_start_thread (GRealThreadPool* pool, 
59                                             GError **error);
60 static void     g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool);
61
62 #define g_thread_should_run(pool, len) \
63   ((pool)->running || (!(pool)->immediate && (len) > 0))
64
65 static gpointer 
66 g_thread_pool_thread_proxy (gpointer data)
67 {
68   GRealThreadPool *pool = data;
69   gboolean watcher = FALSE;
70
71   g_async_queue_lock (pool->queue);
72   while (TRUE)
73     {
74       gpointer task; 
75       gboolean goto_global_pool = !pool->pool.exclusive;
76       gint len = g_async_queue_length_unlocked (pool->queue);
77       
78       if (g_thread_should_run (pool, len))
79         {
80           if (watcher)
81             {
82               /* This thread is actually not needed here, but it waits
83                * for some time anyway. If during that time a new
84                * request arrives, this saves process
85                * swicthes. Otherwise the thread will go to the global
86                * pool afterwards */
87               GTimeVal end_time;
88               g_get_current_time (&end_time);
89               end_time.tv_usec += G_USEC_PER_SEC / 2; /* Halv a second */
90               if (end_time.tv_usec >= G_USEC_PER_SEC)
91                 {
92                   end_time.tv_usec -= G_USEC_PER_SEC;
93                   end_time.tv_sec += 1;
94                 }
95          
96               task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
97             }
98           else
99             {
100               task = g_async_queue_pop_unlocked (pool->queue);
101             }
102
103           if (task)
104             {
105               watcher = FALSE;
106               if (pool->num_threads > pool->max_threads && 
107                   pool->max_threads != -1)
108                 /* We are in fact a superfluous threads, so we go to
109                  * the global pool and just hand the data further to
110                  * the next one waiting in the queue */
111                 {
112                   g_async_queue_push_unlocked (pool->queue, task);
113                   goto_global_pool = TRUE;
114                 }
115               else if (pool->running || !pool->immediate)
116                 {
117                   g_async_queue_unlock (pool->queue);
118                   pool->pool.func (task, pool->pool.user_data);
119                   g_async_queue_lock (pool->queue);
120                 }
121             }
122           len = g_async_queue_length_unlocked (pool->queue);
123         }
124
125       if (!g_thread_should_run (pool, len))
126         {
127           g_cond_broadcast (inform_cond);
128           goto_global_pool = TRUE;
129         }
130       else if (len > 0)
131         {
132           /* At this pool there are no threads waiting, but tasks are. */
133           goto_global_pool = FALSE; 
134         }
135       else if (len == 0 && !watcher && !pool->pool.exclusive)
136         {
137           /* Here neither threads nor tasks are queued and we didn't
138            * just return from a timed wait. We now wait for a limited
139            * time at this pool for new tasks to avoid costly context
140            * switches. */
141           goto_global_pool = FALSE;
142           watcher = TRUE;
143         }
144
145       
146       if (goto_global_pool)
147         {
148           pool->num_threads--; 
149
150           if (!pool->running && !pool->waiting)
151             {
152               if (pool->num_threads == 0)
153                 {
154                   g_async_queue_unlock (pool->queue);
155                   g_thread_pool_free_internal (pool);
156                 }               
157               else if (len == - pool->num_threads)
158                 {
159                   g_thread_pool_wakeup_and_stop_all (pool);
160                   g_async_queue_unlock (pool->queue);
161                 }
162             }
163           else
164             g_async_queue_unlock (pool->queue);
165           
166           g_async_queue_lock (unused_thread_queue);
167
168           G_LOCK (unused_threads);
169           if ((unused_threads >= max_unused_threads && 
170                max_unused_threads != -1))
171             {
172               G_UNLOCK (unused_threads);
173               g_async_queue_unlock (unused_thread_queue);
174               /* Stop this thread */
175               return NULL;      
176             }
177           unused_threads++;
178           G_UNLOCK (unused_threads);
179
180           pool = g_async_queue_pop_unlocked (unused_thread_queue);
181
182           G_LOCK (unused_threads);
183           unused_threads--;
184           G_UNLOCK (unused_threads);
185
186           g_async_queue_unlock (unused_thread_queue);
187           
188           if (pool == stop_this_thread_marker)
189             /* Stop this thread */
190             return NULL;
191           
192           g_async_queue_lock (pool->queue);
193
194           /* pool->num_threads++ is not done here, but in
195            * g_thread_pool_start_thread to make the new started thread
196            * known to the pool, before itself can do it. */
197         }
198     }
199   return NULL;
200 }
201
202 static void
203 g_thread_pool_start_thread (GRealThreadPool  *pool, 
204                             GError          **error)
205 {
206   gboolean success = FALSE;
207   
208   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
209     /* Enough threads are already running */
210     return;
211
212   g_async_queue_lock (unused_thread_queue);
213
214   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
215     {
216       g_async_queue_push_unlocked (unused_thread_queue, pool);
217       success = TRUE;
218     }
219
220   g_async_queue_unlock (unused_thread_queue);
221
222   if (!success)
223     {
224       GError *local_error = NULL;
225       /* No thread was found, we have to start a new one */
226       g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
227       
228       if (local_error)
229         {
230           g_propagate_error (error, local_error);
231           return;
232         }
233     }
234
235   /* See comment in g_thread_pool_thread_proxy as to why this is done
236    * here and not there */
237   pool->num_threads++;
238 }
239
240 /**
241  * g_thread_pool_new: 
242  * @func: a function to execute in the threads of the new thread pool
243  * @user_data: user data that is handed over to @func every time it 
244  *   is called
245  * @max_threads: the maximal number of threads to execute concurrently in 
246  *   the new thread pool, -1 means no limit
247  * @exclusive: should this thread pool be exclusive?
248  * @error: return location for error
249  *
250  * This function creates a new thread pool.
251  *
252  * Whenever you call g_thread_pool_push(), either a new thread is
253  * created or an unused one is reused. At most @max_threads threads
254  * are running concurrently for this thread pool. @max_threads = -1
255  * allows unlimited threads to be created for this thread pool. The
256  * newly created or reused thread now executes the function @func with
257  * the two arguments. The first one is the parameter to
258  * g_thread_pool_push() and the second one is @user_data.
259  *
260  * The parameter @exclusive determines, whether the thread pool owns
261  * all threads exclusive or whether the threads are shared
262  * globally. If @exclusive is @TRUE, @max_threads threads are started
263  * immediately and they will run exclusively for this thread pool until
264  * it is destroyed by g_thread_pool_free(). If @exclusive is @FALSE,
265  * threads are created, when needed and shared between all
266  * non-exclusive thread pools. This implies that @max_threads may not
267  * be -1 for exclusive thread pools.
268  *
269  * @error can be NULL to ignore errors, or non-NULL to report
270  * errors. An error can only occur, when @exclusive is set to @TRUE and
271  * not all @max_threads threads could be created.
272  *
273  * Return value: the new #GThreadPool
274  **/
275 GThreadPool* 
276 g_thread_pool_new (GFunc            func,
277                    gpointer         user_data,
278                    gint             max_threads,
279                    gboolean         exclusive,
280                    GError         **error)
281 {
282   GRealThreadPool *retval;
283   G_LOCK_DEFINE_STATIC (init);
284
285   g_return_val_if_fail (func, NULL);
286   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
287   g_return_val_if_fail (max_threads >= -1, NULL);
288   g_return_val_if_fail (g_thread_supported (), NULL);
289
290   retval = g_new (GRealThreadPool, 1);
291
292   retval->pool.func = func;
293   retval->pool.user_data = user_data;
294   retval->pool.exclusive = exclusive;
295   retval->queue = g_async_queue_new ();
296   retval->max_threads = max_threads;
297   retval->num_threads = 0;
298   retval->running = TRUE;
299
300   G_LOCK (init);
301   
302   if (!inform_mutex)
303     {
304       inform_mutex = g_mutex_new ();
305       inform_cond = g_cond_new ();
306       unused_thread_queue = g_async_queue_new ();
307     }
308
309   G_UNLOCK (init);
310
311   if (retval->pool.exclusive)
312     {
313       g_async_queue_lock (retval->queue);
314   
315       while (retval->num_threads < retval->max_threads)
316         {
317           GError *local_error = NULL;
318           g_thread_pool_start_thread (retval, &local_error);
319           if (local_error)
320             {
321               g_propagate_error (error, local_error);
322               break;
323             }
324         }
325
326       g_async_queue_unlock (retval->queue);
327     }
328
329   return (GThreadPool*) retval;
330 }
331
332 /**
333  * g_thread_pool_push:
334  * @pool: a #GThreadPool
335  * @data: a new task for @pool
336  * @error: return location for error
337  * 
338  * Inserts @data into the list of tasks to be executed by @pool. When
339  * the number of currently running threads is lower than the maximal
340  * allowed number of threads, a new thread is started (or reused) with
341  * the properties given to g_thread_pool_new (). Otherwise @data stays
342  * in the queue until a thread in this pool finishes its previous task
343  * and processes @data. 
344  *
345  * @error can be NULL to ignore errors, or non-NULL to report
346  * errors. An error can only occur, when a new thread couldn't be
347  * created. In that case @data is simply appended to the queue of work
348  * to do.  
349  **/
350 void 
351 g_thread_pool_push (GThreadPool     *pool,
352                     gpointer         data,
353                     GError         **error)
354 {
355   GRealThreadPool *real = (GRealThreadPool*) pool;
356
357   g_return_if_fail (real);
358
359   g_async_queue_lock (real->queue);
360   
361   if (!real->running)
362     {
363       g_async_queue_unlock (real->queue);
364       g_return_if_fail (real->running);
365     }
366
367   if (g_async_queue_length_unlocked (real->queue) >= 0)
368     /* No thread is waiting in the queue */
369     g_thread_pool_start_thread (real, error);
370
371   g_async_queue_push_unlocked (real->queue, data);
372   g_async_queue_unlock (real->queue);
373 }
374
375 /**
376  * g_thread_pool_set_max_threads:
377  * @pool: a #GThreadPool
378  * @max_threads: a new maximal number of threads for @pool
379  * @error: return location for error
380  * 
381  * Sets the maximal allowed number of threads for @pool. A value of -1
382  * means, that the maximal number of threads is unlimited.
383  *
384  * Setting @max_threads to 0 means stopping all work for @pool. It is
385  * effectively frozen until @max_threads is set to a non-zero value
386  * again.
387  * 
388  * A thread is never terminated while calling @func, as supplied by
389  * g_thread_pool_new (). Instead the maximal number of threads only
390  * has effect for the allocation of new threads in g_thread_pool_push
391  * (). A new thread is allocated, whenever the number of currently
392  * running threads in @pool is smaller than the maximal number.
393  *
394  * @error can be NULL to ignore errors, or non-NULL to report
395  * errors. An error can only occur, when a new thread couldn't be
396  * created. 
397  **/
398 void
399 g_thread_pool_set_max_threads (GThreadPool     *pool,
400                                gint             max_threads,
401                                GError         **error)
402 {
403   GRealThreadPool *real = (GRealThreadPool*) pool;
404   gint to_start;
405
406   g_return_if_fail (real);
407   g_return_if_fail (real->running);
408   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
409   g_return_if_fail (max_threads >= -1);
410
411   g_async_queue_lock (real->queue);
412
413   real->max_threads = max_threads;
414   
415   if (pool->exclusive)
416     to_start = real->max_threads - real->num_threads;
417   else
418     to_start = g_async_queue_length_unlocked (real->queue);
419   
420   for ( ; to_start > 0; to_start--)
421     {
422       GError *local_error = NULL;
423       g_thread_pool_start_thread (real, &local_error);
424       if (local_error)
425         {
426           g_propagate_error (error, local_error);
427           break;
428         }
429     }
430    
431   g_async_queue_unlock (real->queue);
432 }
433
434 /**
435  * g_thread_pool_get_max_threads:
436  * @pool: a #GThreadPool
437  *
438  * Returns the maximal number of threads for @pool.
439  *
440  * Return value: the maximal number of threads
441  **/
442 gint
443 g_thread_pool_get_max_threads (GThreadPool     *pool)
444 {
445   GRealThreadPool *real = (GRealThreadPool*) pool;
446   gint retval;
447
448   g_return_val_if_fail (real, 0);
449   g_return_val_if_fail (real->running, 0);
450
451   g_async_queue_lock (real->queue);
452
453   retval = real->max_threads;
454     
455   g_async_queue_unlock (real->queue);
456
457   return retval;
458 }
459
460 /**
461  * g_thread_pool_get_num_threads:
462  * @pool: a #GThreadPool
463  *
464  * Returns the number of threads currently running in @pool.
465  *
466  * Return value: the number of threads currently running
467  **/
468 guint
469 g_thread_pool_get_num_threads (GThreadPool     *pool)
470 {
471   GRealThreadPool *real = (GRealThreadPool*) pool;
472   guint retval;
473
474   g_return_val_if_fail (real, 0);
475   g_return_val_if_fail (real->running, 0);
476
477   g_async_queue_lock (real->queue);
478
479   retval = real->num_threads;
480     
481   g_async_queue_unlock (real->queue);
482
483   return retval;
484 }
485
486 /**
487  * g_thread_pool_unprocessed:
488  * @pool: a #GThreadPool
489  *
490  * Returns the number of tasks still unprocessed in @pool.
491  *
492  * Return value: the number of unprocessed tasks
493  **/
494 guint
495 g_thread_pool_unprocessed (GThreadPool     *pool)
496 {
497   GRealThreadPool *real = (GRealThreadPool*) pool;
498   gint unprocessed;
499
500   g_return_val_if_fail (real, 0);
501   g_return_val_if_fail (real->running, 0);
502
503   unprocessed = g_async_queue_length (real->queue);
504
505   return MAX (unprocessed, 0);
506 }
507
508 /**
509  * g_thread_pool_free:
510  * @pool: a #GThreadPool
511  * @immediate: should @pool shut down immediately?
512  * @wait: should the function wait for all tasks to be finished?
513  *
514  * Frees all resources allocated for @pool.
515  *
516  * If @immediate is #TRUE, no new task is processed for
517  * @pool. Otherwise @pool is not freed before the last task is
518  * processed. Note however, that no thread of this pool is
519  * interrupted, while processing a task. Instead at least all still
520  * running threads can finish their tasks before the @pool is freed.
521  *
522  * If @wait is #TRUE, the functions does not return before all tasks
523  * to be processed (dependent on @immediate, whether all or only the
524  * currently running) are ready. Otherwise the function returns immediately.
525  *
526  * After calling this function @pool must not be used anymore. 
527  **/
528 void
529 g_thread_pool_free (GThreadPool     *pool,
530                     gboolean         immediate,
531                     gboolean         wait)
532 {
533   GRealThreadPool *real = (GRealThreadPool*) pool;
534
535   g_return_if_fail (real);
536   g_return_if_fail (real->running);
537   /* It there's no thread allowed here, there is not much sense in
538    * not stopping this pool immediately, when it's not empty */
539   g_return_if_fail (immediate || real->max_threads != 0 || 
540                     g_async_queue_length (real->queue) == 0);
541
542   g_async_queue_lock (real->queue);
543
544   real->running = FALSE;
545   real->immediate = immediate;
546   real->waiting = wait;
547
548   if (wait)
549     {
550       g_mutex_lock (inform_mutex);
551       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads)
552         {
553           g_async_queue_unlock (real->queue); 
554           g_cond_wait (inform_cond, inform_mutex); 
555           g_async_queue_lock (real->queue); 
556         }
557       g_mutex_unlock (inform_mutex); 
558     }
559
560   if (g_async_queue_length_unlocked (real->queue) == -real->num_threads)
561     {
562       /* No thread is currently doing something (and nothing is left
563        * to process in the queue) */
564       if (real->num_threads == 0) /* No threads left, we clean up */
565         {
566           g_async_queue_unlock (real->queue);
567           g_thread_pool_free_internal (real);
568           return;
569         }
570
571       g_thread_pool_wakeup_and_stop_all (real);
572     }
573   
574   real->waiting = FALSE; /* The last thread should cleanup the pool */
575   g_async_queue_unlock (real->queue);
576 }
577
578 static void
579 g_thread_pool_free_internal (GRealThreadPool* pool)
580 {
581   g_return_if_fail (pool);
582   g_return_if_fail (!pool->running);
583   g_return_if_fail (pool->num_threads == 0);
584
585   g_async_queue_unref (pool->queue);
586
587   g_free (pool);
588 }
589
590 static void
591 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
592 {
593   guint i;
594   
595   g_return_if_fail (pool);
596   g_return_if_fail (!pool->running);
597   g_return_if_fail (pool->num_threads != 0);
598   g_return_if_fail (g_async_queue_length_unlocked (pool->queue) == 
599                     -pool->num_threads);
600
601   pool->immediate = TRUE; 
602   for (i = 0; i < pool->num_threads; i++)
603     g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
604 }
605
606 /**
607  * g_thread_pool_set_max_unused_threads:
608  * @max_threads: maximal number of unused threads
609  *
610  * Sets the maximal number of unused threads to @max_threads. If
611  * @max_threads is -1, no limit is imposed on the number of unused
612  * threads.
613  **/
614 void
615 g_thread_pool_set_max_unused_threads (gint max_threads)
616 {
617   g_return_if_fail (max_threads >= -1);  
618
619   G_LOCK (unused_threads);
620   
621   max_unused_threads = max_threads;
622
623   if (max_unused_threads < unused_threads && max_unused_threads != -1)
624     {
625       guint i;
626
627       g_async_queue_lock (unused_thread_queue);
628       for (i = unused_threads - max_unused_threads; i > 0; i--)
629         g_async_queue_push_unlocked (unused_thread_queue, 
630                                      stop_this_thread_marker);
631       g_async_queue_unlock (unused_thread_queue);
632     }
633     
634   G_UNLOCK (unused_threads);
635 }
636
637 /**
638  * g_thread_pool_get_max_unused_threads:
639  * 
640  * Returns the maximal allowed number of unused threads.
641  *
642  * Return value: the maximal number of unused threads
643  **/
644 gint
645 g_thread_pool_get_max_unused_threads (void)
646 {
647   gint retval;
648   
649   G_LOCK (unused_threads);
650   retval = max_unused_threads;
651   G_UNLOCK (unused_threads);
652
653   return retval;
654 }
655
656 /**
657  * g_thread_pool_get_num_unused_threads:
658  * 
659  * Returns the number of currently unused threads.
660  *
661  * Return value: the number of currently unused threads
662  **/
663 guint g_thread_pool_get_num_unused_threads (void)
664 {
665   guint retval;
666   
667   G_LOCK (unused_threads);
668   retval = unused_threads;
669   G_UNLOCK (unused_threads);
670
671   return retval;
672 }
673
674 /**
675  * g_thread_pool_stop_unused_threads:
676  * 
677  * Stops all currently unused threads. This does not change the
678  * maximal number of unused threads. This function can be used to
679  * regularly stop all unused threads e.g. from g_timeout_add().
680  **/
681 void g_thread_pool_stop_unused_threads (void)
682
683   guint oldval = g_thread_pool_get_max_unused_threads ();
684   g_thread_pool_set_max_unused_threads (0);
685   g_thread_pool_set_max_unused_threads (oldval);
686 }