Signal waiting threads, problem noticed by Christian Kellner.
[platform/upstream/glib.git] / glib / gasyncqueue.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "glib.h"
30 #include "galias.h"
31
32
33 struct _GAsyncQueue
34 {
35   GMutex *mutex;
36   GCond *cond;
37   GQueue *queue;
38   guint waiting_threads;
39   gint32 ref_count;
40 };
41
42 typedef struct {
43   GCompareDataFunc func;
44   gpointer         user_data;
45 } SortData;
46
47 /**
48  * g_async_queue_new:
49  * 
50  * Creates a new asynchronous queue with the initial reference count of 1.
51  * 
52  * Return value: the new #GAsyncQueue.
53  **/
54 GAsyncQueue*
55 g_async_queue_new (void)
56 {
57   GAsyncQueue* retval = g_new (GAsyncQueue, 1);
58   retval->mutex = g_mutex_new ();
59   retval->cond = NULL;
60   retval->queue = g_queue_new ();
61   retval->waiting_threads = 0;
62   retval->ref_count = 1;
63   return retval;
64 }
65
66 /**
67  * g_async_queue_ref:
68  * @queue: a #GAsyncQueue.
69  *
70  * Increases the reference count of the asynchronous @queue by 1. You
71  * do not need to hold the lock to call this function.
72  *
73  * Returns: the @queue that was passed in (since 2.6)
74  **/
75 GAsyncQueue *
76 g_async_queue_ref (GAsyncQueue *queue)
77 {
78   g_return_val_if_fail (queue, NULL);
79   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
80   
81   g_atomic_int_inc (&queue->ref_count);
82
83   return queue;
84 }
85
86 /**
87  * g_async_queue_ref_unlocked:
88  * @queue: a #GAsyncQueue.
89  * 
90  * Increases the reference count of the asynchronous @queue by 1.
91  *
92  * @Deprecated: Since 2.8, reference counting is done atomically
93  * so g_async_queue_ref() can be used regardless of the @queue's
94  * lock.
95  **/
96 void 
97 g_async_queue_ref_unlocked (GAsyncQueue *queue)
98 {
99   g_return_if_fail (queue);
100   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
101   
102   g_atomic_int_inc (&queue->ref_count);
103 }
104
105 /**
106  * g_async_queue_unref_and_unlock:
107  * @queue: a #GAsyncQueue.
108  * 
109  * Decreases the reference count of the asynchronous @queue by 1 and
110  * releases the lock. This function must be called while holding the
111  * @queue's lock. If the reference count went to 0, the @queue will be
112  * destroyed and the memory allocated will be freed.
113  *
114  * @Deprecated: Since 2.8, reference counting is done atomically
115  * so g_async_queue_unref() can be used regardless of the @queue's
116  * lock.
117  **/
118 void 
119 g_async_queue_unref_and_unlock (GAsyncQueue *queue)
120 {
121   g_return_if_fail (queue);
122   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
123
124   g_mutex_unlock (queue->mutex);
125   g_async_queue_unref (queue);
126 }
127
128 /**
129  * g_async_queue_unref:
130  * @queue: a #GAsyncQueue.
131  * 
132  * Decreases the reference count of the asynchronous @queue by 1. If
133  * the reference count went to 0, the @queue will be destroyed and the
134  * memory allocated will be freed. So you are not allowed to use the
135  * @queue afterwards, as it might have disappeared. You do not need to
136  * hold the lock to call this function.
137  **/
138 void 
139 g_async_queue_unref (GAsyncQueue *queue)
140 {
141   g_return_if_fail (queue);
142   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
143   
144   if (g_atomic_int_dec_and_test (&queue->ref_count))
145     {
146       g_return_if_fail (queue->waiting_threads == 0);
147       g_mutex_free (queue->mutex);
148       if (queue->cond)
149         g_cond_free (queue->cond);
150       g_queue_free (queue->queue);
151       g_free (queue);
152     }
153 }
154
155 /**
156  * g_async_queue_lock:
157  * @queue: a #GAsyncQueue.
158  * 
159  * Acquires the @queue's lock. After that you can only call the
160  * <function>g_async_queue_*_unlocked()</function> function variants on that
161  * @queue. Otherwise it will deadlock.
162  **/
163 void
164 g_async_queue_lock (GAsyncQueue *queue)
165 {
166   g_return_if_fail (queue);
167   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
168
169   g_mutex_lock (queue->mutex);
170 }
171
172 /**
173  * g_async_queue_unlock:
174  * @queue: a #GAsyncQueue.
175  * 
176  * Releases the queue's lock.
177  **/
178 void 
179 g_async_queue_unlock (GAsyncQueue *queue)
180 {
181   g_return_if_fail (queue);
182   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
183
184   g_mutex_unlock (queue->mutex);
185 }
186
187 /**
188  * g_async_queue_push:
189  * @queue: a #GAsyncQueue.
190  * @data: @data to push into the @queue.
191  *
192  * Pushes the @data into the @queue. @data must not be %NULL.
193  **/
194 void
195 g_async_queue_push (GAsyncQueue* queue, gpointer data)
196 {
197   g_return_if_fail (queue);
198   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
199   g_return_if_fail (data);
200
201   g_mutex_lock (queue->mutex);
202   g_async_queue_push_unlocked (queue, data);
203   g_mutex_unlock (queue->mutex);
204 }
205
206 /**
207  * g_async_queue_push_unlocked:
208  * @queue: a #GAsyncQueue.
209  * @data: @data to push into the @queue.
210  * 
211  * Pushes the @data into the @queue. @data must not be %NULL. This
212  * function must be called while holding the @queue's lock.
213  **/
214 void
215 g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
216 {
217   g_return_if_fail (queue);
218   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
219   g_return_if_fail (data);
220
221   g_queue_push_head (queue->queue, data);
222   if (queue->waiting_threads > 0)
223     g_cond_signal (queue->cond);
224 }
225
226 /**
227  * g_async_queue_push_sorted:
228  * @queue: a #GAsyncQueue
229  * @data: the @data to push into the @queue
230  * @func: the #GCompareDataFunc is used to sort @queue. This function
231  *     is passed two elements of the @queue. The function should return
232  *     0 if they are equal, a negative value if the first element
233  *     should be higher in the @queue or a positive value if the first
234  *     element should be lower in the @queue than the second element.
235  * @user_data: user data passed to @func.
236  * 
237  * Inserts @data into @queue using @func to determine the new
238  * position. 
239  * 
240  * This function requires that the @queue is sorted before pushing on
241  * new elements.
242  * 
243  * This function will lock @queue before it sorts the queue and unlock
244  * it when it is finished.
245  * 
246  * For an example of @func see g_async_queue_sort(). 
247  *
248  * Since: 2.10
249  **/
250 void
251 g_async_queue_push_sorted (GAsyncQueue      *queue,
252                            gpointer          data,
253                            GCompareDataFunc  func,
254                            gpointer          user_data)
255 {
256   g_return_if_fail (queue != NULL);
257
258   g_mutex_lock (queue->mutex);
259   g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
260   g_mutex_unlock (queue->mutex);
261 }
262
263 static gint 
264 g_async_queue_invert_compare (gpointer  v1, 
265                               gpointer  v2, 
266                               SortData *sd)
267 {
268   return -sd->func (v1, v2, sd->user_data);
269 }
270
271 /**
272  * g_async_queue_push_sorted_unlocked:
273  * @queue: a #GAsyncQueue
274  * @data: the @data to push into the @queue
275  * @func: the #GCompareDataFunc is used to sort @queue. This function
276  *     is passed two elements of the @queue. The function should return
277  *     0 if they are equal, a negative value if the first element
278  *     should be higher in the @queue or a positive value if the first
279  *     element should be lower in the @queue than the second element.
280  * @user_data: user data passed to @func.
281  * 
282  * Inserts @data into @queue using @func to determine the new
283  * position.
284  * 
285  * This function requires that the @queue is sorted before pushing on
286  * new elements.
287  * 
288  * This function is called while holding the @queue's lock.
289  * 
290  * For an example of @func see g_async_queue_sort(). 
291  *
292  * Since: 2.10
293  **/
294 void
295 g_async_queue_push_sorted_unlocked (GAsyncQueue      *queue,
296                                     gpointer          data,
297                                     GCompareDataFunc  func,
298                                     gpointer          user_data)
299 {
300   SortData sd;
301   
302   g_return_if_fail (queue != NULL);
303
304   sd.func = func;
305   sd.user_data = user_data;
306
307   g_queue_insert_sorted (queue->queue, 
308                          data, 
309                          (GCompareDataFunc)g_async_queue_invert_compare, 
310                          &sd);
311   if (queue->waiting_threads > 0)
312     g_cond_signal (queue->cond);
313 }
314
315 static gpointer
316 g_async_queue_pop_intern_unlocked (GAsyncQueue* queue, gboolean try, 
317                                    GTimeVal *end_time)
318 {
319   gpointer retval;
320
321   if (!g_queue_peek_tail (queue->queue))
322     {
323       if (try)
324         return NULL;
325       
326       if (!queue->cond)
327         queue->cond = g_cond_new ();
328
329       if (!end_time)
330         {
331           queue->waiting_threads++;
332           while (!g_queue_peek_tail (queue->queue))
333             g_cond_wait(queue->cond, queue->mutex);
334           queue->waiting_threads--;
335         }
336       else
337         {
338           queue->waiting_threads++;
339           while (!g_queue_peek_tail (queue->queue))
340             if (!g_cond_timed_wait (queue->cond, queue->mutex, end_time))
341               break;
342           queue->waiting_threads--;
343           if (!g_queue_peek_tail (queue->queue))
344             return NULL;
345         }
346     }
347
348   retval = g_queue_pop_tail (queue->queue);
349
350   g_assert (retval);
351
352   return retval;
353 }
354
355 /**
356  * g_async_queue_pop:
357  * @queue: a #GAsyncQueue.
358  * 
359  * Pops data from the @queue. This function blocks until data become
360  * available.
361  *
362  * Return value: data from the queue.
363  **/
364 gpointer
365 g_async_queue_pop (GAsyncQueue* queue)
366 {
367   gpointer retval;
368
369   g_return_val_if_fail (queue, NULL);
370   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
371
372   g_mutex_lock (queue->mutex);
373   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
374   g_mutex_unlock (queue->mutex);
375
376   return retval;
377 }
378
379 /**
380  * g_async_queue_pop_unlocked:
381  * @queue: a #GAsyncQueue.
382  * 
383  * Pops data from the @queue. This function blocks until data become
384  * available. This function must be called while holding the @queue's
385  * lock.
386  *
387  * Return value: data from the queue.
388  **/
389 gpointer
390 g_async_queue_pop_unlocked (GAsyncQueue* queue)
391 {
392   g_return_val_if_fail (queue, NULL);
393   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
394
395   return g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
396 }
397
398 /**
399  * g_async_queue_try_pop:
400  * @queue: a #GAsyncQueue.
401  * 
402  * Tries to pop data from the @queue. If no data is available, %NULL is
403  * returned.
404  *
405  * Return value: data from the queue or %NULL, when no data is
406  * available immediately.
407  **/
408 gpointer
409 g_async_queue_try_pop (GAsyncQueue* queue)
410 {
411   gpointer retval;
412
413   g_return_val_if_fail (queue, NULL);
414   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
415
416   g_mutex_lock (queue->mutex);
417   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
418   g_mutex_unlock (queue->mutex);
419
420   return retval;
421 }
422
423 /**
424  * g_async_queue_try_pop_unlocked:
425  * @queue: a #GAsyncQueue.
426  * 
427  * Tries to pop data from the @queue. If no data is available, %NULL is
428  * returned. This function must be called while holding the @queue's
429  * lock.
430  *
431  * Return value: data from the queue or %NULL, when no data is
432  * available immediately.
433  **/
434 gpointer
435 g_async_queue_try_pop_unlocked (GAsyncQueue* queue)
436 {
437   g_return_val_if_fail (queue, NULL);
438   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
439
440   return g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
441 }
442
443 /**
444  * g_async_queue_timed_pop:
445  * @queue: a #GAsyncQueue.
446  * @end_time: a #GTimeVal, determining the final time.
447  *
448  * Pops data from the @queue. If no data is received before @end_time,
449  * %NULL is returned.
450  *
451  * To easily calculate @end_time a combination of g_get_current_time()
452  * and g_time_val_add() can be used.
453  *
454  * Return value: data from the queue or %NULL, when no data is
455  * received before @end_time.
456  **/
457 gpointer
458 g_async_queue_timed_pop (GAsyncQueue* queue, GTimeVal *end_time)
459 {
460   gpointer retval;
461
462   g_return_val_if_fail (queue, NULL);
463   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
464
465   g_mutex_lock (queue->mutex);
466   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
467   g_mutex_unlock (queue->mutex);
468
469   return retval;  
470 }
471
472 /**
473  * g_async_queue_timed_pop_unlocked:
474  * @queue: a #GAsyncQueue.
475  * @end_time: a #GTimeVal, determining the final time.
476  *
477  * Pops data from the @queue. If no data is received before @end_time,
478  * %NULL is returned. This function must be called while holding the
479  * @queue's lock.
480  *
481  * To easily calculate @end_time a combination of g_get_current_time()
482  * and g_time_val_add() can be used.
483  *
484  * Return value: data from the queue or %NULL, when no data is
485  * received before @end_time.
486  **/
487 gpointer
488 g_async_queue_timed_pop_unlocked (GAsyncQueue* queue, GTimeVal *end_time)
489 {
490   g_return_val_if_fail (queue, NULL);
491   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
492
493   return g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
494 }
495
496 /**
497  * g_async_queue_length:
498  * @queue: a #GAsyncQueue.
499  * 
500  * Returns the length of the queue, negative values mean waiting
501  * threads, positive values mean available entries in the
502  * @queue. Actually this function returns the number of data items in
503  * the queue minus the number of waiting threads. Thus a return value
504  * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
505  * That can happen due to locking of the queue or due to
506  * scheduling.  
507  *
508  * Return value: the length of the @queue.
509  **/
510 gint
511 g_async_queue_length (GAsyncQueue* queue)
512 {
513   gint retval;
514
515   g_return_val_if_fail (queue, 0);
516   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
517
518   g_mutex_lock (queue->mutex);
519   retval = queue->queue->length - queue->waiting_threads;
520   g_mutex_unlock (queue->mutex);
521
522   return retval;
523 }
524
525 /**
526  * g_async_queue_length_unlocked:
527  * @queue: a #GAsyncQueue.
528  * 
529  * Returns the length of the queue, negative values mean waiting
530  * threads, positive values mean available entries in the
531  * @queue. Actually this function returns the number of data items in
532  * the queue minus the number of waiting threads. Thus a return value
533  * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
534  * That can happen due to locking of the queue or due to
535  * scheduling. This function must be called while holding the @queue's
536  * lock.
537  *
538  * Return value: the length of the @queue.
539  **/
540 gint
541 g_async_queue_length_unlocked (GAsyncQueue* queue)
542 {
543   g_return_val_if_fail (queue, 0);
544   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
545
546   return queue->queue->length - queue->waiting_threads;
547 }
548
549 /**
550  * g_async_queue_sort:
551  * @queue: a #GAsyncQueue
552  * @func: the #GCompareDataFunc is used to sort @queue. This
553  *     function is passed two elements of the @queue. The function
554  *     should return 0 if they are equal, a negative value if the
555  *     first element should be higher in the @queue or a positive
556  *     value if the first element should be lower in the @queue than
557  *     the second element. 
558  * @user_data: user data passed to @func
559  *
560  * Sorts @queue using @func. 
561  *
562  * This function will lock @queue before it sorts the queue and unlock
563  * it when it is finished.
564  *
565  * If you were sorting a list of priority numbers to make sure the
566  * lowest priority would be at the top of the queue, you could use:
567  * <informalexample><programlisting> 
568  *  gint32 id1;
569  *  gint32 id2;
570  *   
571  *  id1 = GPOINTER_TO_INT (element1);
572  *  id2 = GPOINTER_TO_INT (element2);
573  *   
574  *  return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
575  * </programlisting></informalexample>
576  *
577  * Since: 2.10
578  **/
579 void
580 g_async_queue_sort (GAsyncQueue      *queue,
581                     GCompareDataFunc  func,
582                     gpointer          user_data)
583 {
584   g_return_if_fail (queue != NULL);
585   g_return_if_fail (func != NULL);
586
587   g_mutex_lock (queue->mutex);
588   g_async_queue_sort_unlocked (queue, func, user_data);
589   g_mutex_unlock (queue->mutex);
590 }
591
592 /**
593  * g_async_queue_sort_unlocked:
594  * @queue: a #GAsyncQueue
595  * @func: the #GCompareDataFunc is used to sort @queue. This
596  *     function is passed two elements of the @queue. The function
597  *     should return 0 if they are equal, a negative value if the
598  *     first element should be higher in the @queue or a positive
599  *     value if the first element should be lower in the @queue than
600  *     the second element. 
601  * @user_data: user data passed to @func
602  *
603  * Sorts @queue using @func. 
604  *
605  * This function is called while holding the @queue's lock.
606  * 
607  * Since: 2.10
608  **/
609 void
610 g_async_queue_sort_unlocked (GAsyncQueue      *queue,
611                              GCompareDataFunc  func,
612                              gpointer          user_data)
613 {
614   SortData sd;
615
616   g_return_if_fail (queue != NULL);
617   g_return_if_fail (func != NULL);
618
619   sd.func = func;
620   sd.user_data = user_data;
621
622   g_queue_sort (queue->queue, 
623                 (GCompareDataFunc)g_async_queue_invert_compare, 
624                 &sd);
625 }
626
627 #define __G_ASYNCQUEUE_C__
628 #include "galiasdef.c"