Cygwin support contributed by Stefan Ondrejicka <ondrej@idata.sk>.
[platform/upstream/glib.git] / gthreadpool.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "glib.h"
28
29 typedef struct _GRealThreadPool GRealThreadPool;
30
31 struct _GRealThreadPool
32 {
33   GThreadPool pool;
34   GAsyncQueue* queue;
35   gint max_threads;
36   gint num_threads;
37   gboolean running;
38   gboolean immediate;
39   gboolean waiting;
40 };
41
42 /* The following is just an address to mark the stop order for a
43  * thread, it could be any address (as long, as it isn;t a valid
44  * GThreadPool address) */
45 static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new;
46
47 /* Here all unused threads are waiting, depending on their priority */
48 static GAsyncQueue *unused_thread_queue[G_THREAD_PRIORITY_URGENT + 1];
49 static gint unused_threads = 0;
50 static gint max_unused_threads = 0;
51 G_LOCK_DEFINE_STATIC (unused_threads);
52
53 static GMutex *inform_mutex = NULL;
54 static GCond *inform_cond = NULL;
55
56 static void g_thread_pool_free_internal (GRealThreadPool* pool);
57 static void g_thread_pool_thread_proxy (gpointer data);
58 static void g_thread_pool_start_thread (GRealThreadPool* pool, GError **error);
59 static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool);
60
61 #define g_thread_should_run(pool, len) \
62   ((pool)->running || (!(pool)->immediate && (len) > 0))
63
64 static void 
65 g_thread_pool_thread_proxy (gpointer data)
66 {
67   GRealThreadPool *pool = data;
68
69   g_async_queue_lock (pool->queue);
70   while (TRUE)
71     {
72       gpointer task; 
73       gboolean goto_global_pool = !pool->pool.exclusive;
74       gint len = g_async_queue_length_unlocked (pool->queue);
75       
76       if (g_thread_should_run (pool, len))
77         {
78           task = g_async_queue_pop_unlocked (pool->queue);
79
80           if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
81             /* We are in fact a superfluous threads, so we go to the
82              * global pool and just hand the data further to the next one
83              * waiting in the queue */
84             {
85               g_async_queue_push_unlocked (pool->queue, task);
86               goto_global_pool = TRUE;
87             }
88           else if (pool->running || !pool->immediate)
89             {
90               g_async_queue_unlock (pool->queue);
91               pool->pool.thread_func (task, pool->pool.user_data);
92               g_async_queue_lock (pool->queue);
93             }
94
95           len = g_async_queue_length_unlocked (pool->queue);
96         }
97
98       if (!g_thread_should_run (pool, len))
99         g_cond_broadcast (inform_cond);
100       
101       if (!pool->running && (pool->immediate || len <= 0))
102         goto_global_pool = TRUE;
103       else if (len >= 0)
104         /* At this pool there is no thread waiting */
105         goto_global_pool = FALSE; 
106       
107       if (goto_global_pool)
108         {
109           GThreadPriority priority = pool->pool.priority;
110           pool->num_threads--; 
111
112           if (!pool->running && !pool->waiting)
113             {
114               if (pool->num_threads == 0)
115                 {
116                   g_async_queue_unlock (pool->queue);
117                   g_thread_pool_free_internal (pool);
118                 }               
119               else if (len == - pool->num_threads)
120                 g_thread_pool_wakeup_and_stop_all (pool);
121             }
122           else
123             g_async_queue_unlock (pool->queue);
124           
125           g_async_queue_lock (unused_thread_queue[priority]);
126
127           G_LOCK (unused_threads);
128           if (unused_threads >= max_unused_threads && max_unused_threads != -1)
129             {
130               G_UNLOCK (unused_threads);
131               g_async_queue_unlock (unused_thread_queue[priority]);
132               /* Stop this thread */
133               return;      
134             }
135           unused_threads++;
136           G_UNLOCK (unused_threads);
137
138           pool = 
139             g_async_queue_pop_unlocked (unused_thread_queue[priority]);
140
141           G_LOCK (unused_threads);
142           unused_threads--;
143           G_UNLOCK (unused_threads);
144
145           g_async_queue_unlock (unused_thread_queue[priority]);
146           
147           if (pool == stop_this_thread_marker)
148             /* Stop this thread */
149             return;
150           
151           g_async_queue_lock (pool->queue);
152
153           /* pool->num_threads++ is not done here, but in
154            * g_thread_pool_start_thread to make the new started thread
155            * known to the pool, before itself can do it. */
156         }
157     }
158 }
159
160 static void
161 g_thread_pool_start_thread (GRealThreadPool  *pool, 
162                             GError          **error)
163 {
164   gboolean success = FALSE;
165   GThreadPriority priority = pool->pool.priority;
166   GAsyncQueue *queue = unused_thread_queue[priority];
167   
168   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
169     /* Enough threads are already running */
170     return;
171
172   g_async_queue_lock (queue);
173
174   if (g_async_queue_length_unlocked (queue) < 0)
175     {
176       /* First we try a thread with the right priority */
177       g_async_queue_push_unlocked (queue, pool);
178       success = TRUE;
179     }
180
181   g_async_queue_unlock (queue);
182
183   /* We will not search for threads with other priorities, because changing
184    * priority is quite unportable */
185   
186   if (!success)
187     {
188       GError *local_error = NULL;
189       /* No thread was found, we have to start one new */
190       g_thread_create (g_thread_pool_thread_proxy, pool, 
191                        pool->pool.stack_size, FALSE, 
192                        pool->pool.bound, priority, &local_error);
193       
194       if (local_error)
195         {
196           g_propagate_error (error, local_error);
197           return;
198         }
199     }
200
201   /* See comment in g_thread_pool_thread_proxy as to why this is done
202    * here and not there */
203   pool->num_threads++;
204 }
205
206 GThreadPool* 
207 g_thread_pool_new (GFunc            thread_func,
208                    gint             max_threads,
209                    gulong           stack_size,
210                    gboolean         bound,
211                    GThreadPriority  priority,
212                    gboolean         exclusive,
213                    gpointer         user_data,
214                    GError         **error)
215 {
216   GRealThreadPool *retval;
217
218   g_return_val_if_fail (thread_func, NULL);
219   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
220   g_return_val_if_fail (max_threads >= -1, NULL);
221   g_return_val_if_fail (g_thread_supported (), NULL);
222
223   retval = g_new (GRealThreadPool, 1);
224
225   retval->pool.thread_func = thread_func;
226   retval->pool.stack_size = stack_size;
227   retval->pool.bound = bound;
228   retval->pool.priority = priority;
229   retval->pool.exclusive = exclusive;
230   retval->pool.user_data = user_data;
231   retval->queue = g_async_queue_new ();
232   retval->max_threads = max_threads;
233   retval->num_threads = 0;
234   retval->running = TRUE;
235
236   if (!inform_mutex)
237     {
238       inform_mutex = g_mutex_new ();
239       inform_cond = g_cond_new ();
240       for (priority = G_THREAD_PRIORITY_LOW; 
241            priority < G_THREAD_PRIORITY_URGENT + 1; priority++)
242         unused_thread_queue[priority] = g_async_queue_new ();
243     }
244
245   if (retval->pool.exclusive)
246     {
247       g_async_queue_lock (retval->queue);
248   
249       while (retval->num_threads < retval->max_threads)
250         {
251           GError *local_error = NULL;
252           g_thread_pool_start_thread (retval, &local_error);
253           if (local_error)
254             {
255               g_propagate_error (error, local_error);
256               break;
257             }
258         }
259
260       g_async_queue_unlock (retval->queue);
261     }
262
263   return (GThreadPool*) retval;
264 }
265
266 void 
267 g_thread_pool_push (GThreadPool     *pool,
268                     gpointer         data,
269                     GError         **error)
270 {
271   GRealThreadPool *real = (GRealThreadPool*) pool;
272
273   g_return_if_fail (real);
274
275   g_async_queue_lock (real->queue);
276   
277   if (!real->running)
278     {
279       g_async_queue_unlock (real->queue);
280       g_return_if_fail (real->running);
281     }
282
283   if (!pool->exclusive && g_async_queue_length_unlocked (real->queue) >= 0)
284     /* No thread is waiting in the queue */
285     g_thread_pool_start_thread (real, error);
286
287   g_async_queue_push_unlocked (real->queue, data);
288   g_async_queue_unlock (real->queue);
289 }
290
291 void
292 g_thread_pool_set_max_threads (GThreadPool     *pool,
293                                gint             max_threads,
294                                GError         **error)
295 {
296   GRealThreadPool *real = (GRealThreadPool*) pool;
297   gint to_start;
298
299   g_return_if_fail (real);
300   g_return_if_fail (real->running);
301   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
302   g_return_if_fail (max_threads >= -1);
303
304   g_async_queue_lock (real->queue);
305
306   real->max_threads = max_threads;
307   
308   if (pool->exclusive)
309     to_start = real->max_threads - real->num_threads;
310   else
311     to_start = g_async_queue_length_unlocked (real->queue);
312   
313   for ( ; to_start > 0; to_start--)
314     {
315       GError *local_error = NULL;
316       g_thread_pool_start_thread (real, &local_error);
317       if (local_error)
318         {
319           g_propagate_error (error, local_error);
320           break;
321         }
322     }
323    
324   g_async_queue_unlock (real->queue);
325 }
326
327 gint
328 g_thread_pool_get_max_threads (GThreadPool     *pool)
329 {
330   GRealThreadPool *real = (GRealThreadPool*) pool;
331   gint retval;
332
333   g_return_val_if_fail (real, 0);
334   g_return_val_if_fail (real->running, 0);
335
336   g_async_queue_lock (real->queue);
337
338   retval = real->max_threads;
339     
340   g_async_queue_unlock (real->queue);
341
342   return retval;
343 }
344
345 guint
346 g_thread_pool_get_num_threads (GThreadPool     *pool)
347 {
348   GRealThreadPool *real = (GRealThreadPool*) pool;
349   guint retval;
350
351   g_return_val_if_fail (real, 0);
352   g_return_val_if_fail (real->running, 0);
353
354   g_async_queue_lock (real->queue);
355
356   retval = real->num_threads;
357     
358   g_async_queue_unlock (real->queue);
359
360   return retval;
361 }
362
363 guint
364 g_thread_pool_unprocessed (GThreadPool     *pool)
365 {
366   GRealThreadPool *real = (GRealThreadPool*) pool;
367   gint unprocessed;
368
369   g_return_val_if_fail (real, 0);
370   g_return_val_if_fail (real->running, 0);
371
372   unprocessed = g_async_queue_length (real->queue);
373
374   return MAX (unprocessed, 0);
375 }
376
377 void
378 g_thread_pool_free (GThreadPool     *pool,
379                     gboolean         immediate,
380                     gboolean         wait)
381 {
382   GRealThreadPool *real = (GRealThreadPool*) pool;
383
384   g_return_if_fail (real);
385   g_return_if_fail (real->running);
386   /* It there's no thread allowed here, there is not much sense in
387    * not stopping this pool immediatly, when it's not empty */
388   g_return_if_fail (immediate || real->max_threads != 0 || 
389                     g_async_queue_length (real->queue) == 0);
390
391   g_async_queue_lock (real->queue);
392
393   real->running = FALSE;
394   real->immediate = immediate;
395   real->waiting = wait;
396
397   if (wait)
398     {
399       g_mutex_lock (inform_mutex);
400       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads)
401         {
402           g_async_queue_unlock (real->queue); 
403           g_cond_wait (inform_cond, inform_mutex); 
404           g_async_queue_lock (real->queue); 
405         }
406       g_mutex_unlock (inform_mutex); 
407     }
408
409   if (g_async_queue_length_unlocked (real->queue) == -real->num_threads)
410     {
411       /* No thread is currently doing something (and nothing is left
412        * to process in the queue) */
413       if (real->num_threads == 0) /* No threads left, we clean up */
414         {
415           g_async_queue_unlock (real->queue);
416           g_thread_pool_free_internal (real);
417           return;
418         }
419
420       g_thread_pool_wakeup_and_stop_all (real);
421     }
422   
423   real->waiting = FALSE; /* The last thread should cleanup the pool */
424   g_async_queue_unlock (real->queue);
425 }
426
427 static void
428 g_thread_pool_free_internal (GRealThreadPool* pool)
429 {
430   g_return_if_fail (pool);
431   g_return_if_fail (!pool->running);
432   g_return_if_fail (pool->num_threads == 0);
433
434   g_async_queue_unref (pool->queue);
435
436   g_free (pool);
437 }
438
439 static void
440 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
441 {
442   guint i;
443   
444   g_return_if_fail (pool);
445   g_return_if_fail (!pool->running);
446   g_return_if_fail (pool->num_threads != 0);
447   g_return_if_fail (g_async_queue_length_unlocked (pool->queue) == 
448                     -pool->num_threads);
449
450   pool->immediate = TRUE; 
451   for (i = 0; i < pool->num_threads; i++)
452     g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
453 }
454
455 void
456 g_thread_pool_set_max_unused_threads (gint max_threads)
457 {
458   g_return_if_fail (max_threads >= -1);  
459
460   G_LOCK (unused_threads);
461   
462   max_unused_threads = max_threads;
463
464   if (max_unused_threads < unused_threads && max_unused_threads != -1)
465     {
466       guint close_down_num = unused_threads - max_unused_threads;
467       GThreadPriority priority;
468
469       while (close_down_num > 0)
470         {
471           guint old_close_down_num = close_down_num;
472           for (priority = G_THREAD_PRIORITY_LOW; 
473                priority < G_THREAD_PRIORITY_URGENT + 1 && close_down_num > 0; 
474                priority++)
475             {
476               GAsyncQueue *queue = unused_thread_queue[priority];
477               g_async_queue_lock (queue);
478               
479               if (g_async_queue_length_unlocked (queue) < 0)
480                 {
481                   g_async_queue_push_unlocked (queue, 
482                                                stop_this_thread_marker);
483                   close_down_num--;
484                 }
485               
486               g_async_queue_unlock (queue);
487             }
488
489           /* Just to make sure, there are no counting problems */
490           g_assert (old_close_down_num != close_down_num);
491         }
492     }
493     
494   G_UNLOCK (unused_threads);
495 }
496
497 gint
498 g_thread_pool_get_max_unused_threads (void)
499 {
500   gint retval;
501   
502   G_LOCK (unused_threads);
503   retval = max_unused_threads;
504   G_UNLOCK (unused_threads);
505
506   return retval;
507 }
508
509 guint g_thread_pool_get_num_unused_threads (void)
510 {
511   guint retval;
512   
513   G_LOCK (unused_threads);
514   retval = unused_threads;
515   G_UNLOCK (unused_threads);
516
517   return retval;
518 }
519
520 void g_thread_pool_stop_unused_threads (void)
521
522   guint oldval = g_thread_pool_get_max_unused_threads ();
523   g_thread_pool_set_max_unused_threads (0);
524   g_thread_pool_set_max_unused_threads (oldval);
525 }