Clean Glib header #include issues: gasyncqueue
[platform/upstream/glib.git] / glib / gasyncqueue.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "gasyncqueue.h"
30
31 #include "gmem.h"
32 #include "gqueue.h"
33 #include "gtestutils.h"
34 #include "gthread.h"
35
36 #include "galias.h"
37
38
39 struct _GAsyncQueue
40 {
41   GMutex *mutex;
42   GCond *cond;
43   GQueue *queue;
44   GDestroyNotify item_free_func;
45   guint waiting_threads;
46   gint32 ref_count;
47 };
48
49 typedef struct {
50   GCompareDataFunc func;
51   gpointer         user_data;
52 } SortData;
53
54 /**
55  * g_async_queue_new:
56  * 
57  * Creates a new asynchronous queue with the initial reference count of 1.
58  * 
59  * Return value: the new #GAsyncQueue.
60  **/
61 GAsyncQueue*
62 g_async_queue_new (void)
63 {
64   GAsyncQueue* retval = g_new (GAsyncQueue, 1);
65   retval->mutex = g_mutex_new ();
66   retval->cond = NULL;
67   retval->queue = g_queue_new ();
68   retval->waiting_threads = 0;
69   retval->ref_count = 1;
70   retval->item_free_func = NULL;
71   return retval;
72 }
73
74 /**
75  * g_async_queue_new_full:
76  * @item_free_func: function to free queue elements
77  * 
78  * Creates a new asynchronous queue with an initial reference count of 1 and
79  * sets up a destroy notify function that is used to free any remaining
80  * queue items when the queue is destroyed after the final unref.
81  *
82  * Return value: the new #GAsyncQueue.
83  *
84  * Since: 2.16
85  **/
86 GAsyncQueue*
87 g_async_queue_new_full (GDestroyNotify item_free_func)
88 {
89   GAsyncQueue *async_queue = g_async_queue_new ();
90   async_queue->item_free_func = item_free_func;
91   return async_queue;
92 }
93
94 /**
95  * g_async_queue_ref:
96  * @queue: a #GAsyncQueue.
97  *
98  * Increases the reference count of the asynchronous @queue by 1. You
99  * do not need to hold the lock to call this function.
100  *
101  * Returns: the @queue that was passed in (since 2.6)
102  **/
103 GAsyncQueue *
104 g_async_queue_ref (GAsyncQueue *queue)
105 {
106   g_return_val_if_fail (queue, NULL);
107   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
108   
109   g_atomic_int_inc (&queue->ref_count);
110
111   return queue;
112 }
113
114 /**
115  * g_async_queue_ref_unlocked:
116  * @queue: a #GAsyncQueue.
117  * 
118  * Increases the reference count of the asynchronous @queue by 1.
119  *
120  * @Deprecated: Since 2.8, reference counting is done atomically
121  * so g_async_queue_ref() can be used regardless of the @queue's
122  * lock.
123  **/
124 void 
125 g_async_queue_ref_unlocked (GAsyncQueue *queue)
126 {
127   g_return_if_fail (queue);
128   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
129   
130   g_atomic_int_inc (&queue->ref_count);
131 }
132
133 /**
134  * g_async_queue_unref_and_unlock:
135  * @queue: a #GAsyncQueue.
136  * 
137  * Decreases the reference count of the asynchronous @queue by 1 and
138  * releases the lock. This function must be called while holding the
139  * @queue's lock. If the reference count went to 0, the @queue will be
140  * destroyed and the memory allocated will be freed.
141  *
142  * @Deprecated: Since 2.8, reference counting is done atomically
143  * so g_async_queue_unref() can be used regardless of the @queue's
144  * lock.
145  **/
146 void 
147 g_async_queue_unref_and_unlock (GAsyncQueue *queue)
148 {
149   g_return_if_fail (queue);
150   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
151
152   g_mutex_unlock (queue->mutex);
153   g_async_queue_unref (queue);
154 }
155
156 /**
157  * g_async_queue_unref:
158  * @queue: a #GAsyncQueue.
159  * 
160  * Decreases the reference count of the asynchronous @queue by 1. If
161  * the reference count went to 0, the @queue will be destroyed and the
162  * memory allocated will be freed. So you are not allowed to use the
163  * @queue afterwards, as it might have disappeared. You do not need to
164  * hold the lock to call this function.
165  **/
166 void 
167 g_async_queue_unref (GAsyncQueue *queue)
168 {
169   g_return_if_fail (queue);
170   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
171   
172   if (g_atomic_int_dec_and_test (&queue->ref_count))
173     {
174       g_return_if_fail (queue->waiting_threads == 0);
175       g_mutex_free (queue->mutex);
176       if (queue->cond)
177         g_cond_free (queue->cond);
178       if (queue->item_free_func)
179         g_queue_foreach (queue->queue, (GFunc) queue->item_free_func, NULL);
180       g_queue_free (queue->queue);
181       g_free (queue);
182     }
183 }
184
185 /**
186  * g_async_queue_lock:
187  * @queue: a #GAsyncQueue.
188  * 
189  * Acquires the @queue's lock. After that you can only call the
190  * <function>g_async_queue_*_unlocked()</function> function variants on that
191  * @queue. Otherwise it will deadlock.
192  **/
193 void
194 g_async_queue_lock (GAsyncQueue *queue)
195 {
196   g_return_if_fail (queue);
197   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
198
199   g_mutex_lock (queue->mutex);
200 }
201
202 /**
203  * g_async_queue_unlock:
204  * @queue: a #GAsyncQueue.
205  * 
206  * Releases the queue's lock.
207  **/
208 void 
209 g_async_queue_unlock (GAsyncQueue *queue)
210 {
211   g_return_if_fail (queue);
212   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
213
214   g_mutex_unlock (queue->mutex);
215 }
216
217 /**
218  * g_async_queue_push:
219  * @queue: a #GAsyncQueue.
220  * @data: @data to push into the @queue.
221  *
222  * Pushes the @data into the @queue. @data must not be %NULL.
223  **/
224 void
225 g_async_queue_push (GAsyncQueue* queue, gpointer data)
226 {
227   g_return_if_fail (queue);
228   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
229   g_return_if_fail (data);
230
231   g_mutex_lock (queue->mutex);
232   g_async_queue_push_unlocked (queue, data);
233   g_mutex_unlock (queue->mutex);
234 }
235
236 /**
237  * g_async_queue_push_unlocked:
238  * @queue: a #GAsyncQueue.
239  * @data: @data to push into the @queue.
240  * 
241  * Pushes the @data into the @queue. @data must not be %NULL. This
242  * function must be called while holding the @queue's lock.
243  **/
244 void
245 g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
246 {
247   g_return_if_fail (queue);
248   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
249   g_return_if_fail (data);
250
251   g_queue_push_head (queue->queue, data);
252   if (queue->waiting_threads > 0)
253     g_cond_signal (queue->cond);
254 }
255
256 /**
257  * g_async_queue_push_sorted:
258  * @queue: a #GAsyncQueue
259  * @data: the @data to push into the @queue
260  * @func: the #GCompareDataFunc is used to sort @queue. This function
261  *     is passed two elements of the @queue. The function should return
262  *     0 if they are equal, a negative value if the first element
263  *     should be higher in the @queue or a positive value if the first
264  *     element should be lower in the @queue than the second element.
265  * @user_data: user data passed to @func.
266  * 
267  * Inserts @data into @queue using @func to determine the new
268  * position. 
269  * 
270  * This function requires that the @queue is sorted before pushing on
271  * new elements.
272  * 
273  * This function will lock @queue before it sorts the queue and unlock
274  * it when it is finished.
275  * 
276  * For an example of @func see g_async_queue_sort(). 
277  *
278  * Since: 2.10
279  **/
280 void
281 g_async_queue_push_sorted (GAsyncQueue      *queue,
282                            gpointer          data,
283                            GCompareDataFunc  func,
284                            gpointer          user_data)
285 {
286   g_return_if_fail (queue != NULL);
287
288   g_mutex_lock (queue->mutex);
289   g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
290   g_mutex_unlock (queue->mutex);
291 }
292
293 static gint 
294 g_async_queue_invert_compare (gpointer  v1, 
295                               gpointer  v2, 
296                               SortData *sd)
297 {
298   return -sd->func (v1, v2, sd->user_data);
299 }
300
301 /**
302  * g_async_queue_push_sorted_unlocked:
303  * @queue: a #GAsyncQueue
304  * @data: the @data to push into the @queue
305  * @func: the #GCompareDataFunc is used to sort @queue. This function
306  *     is passed two elements of the @queue. The function should return
307  *     0 if they are equal, a negative value if the first element
308  *     should be higher in the @queue or a positive value if the first
309  *     element should be lower in the @queue than the second element.
310  * @user_data: user data passed to @func.
311  * 
312  * Inserts @data into @queue using @func to determine the new
313  * position.
314  * 
315  * This function requires that the @queue is sorted before pushing on
316  * new elements.
317  * 
318  * This function is called while holding the @queue's lock.
319  * 
320  * For an example of @func see g_async_queue_sort(). 
321  *
322  * Since: 2.10
323  **/
324 void
325 g_async_queue_push_sorted_unlocked (GAsyncQueue      *queue,
326                                     gpointer          data,
327                                     GCompareDataFunc  func,
328                                     gpointer          user_data)
329 {
330   SortData sd;
331   
332   g_return_if_fail (queue != NULL);
333
334   sd.func = func;
335   sd.user_data = user_data;
336
337   g_queue_insert_sorted (queue->queue, 
338                          data, 
339                          (GCompareDataFunc)g_async_queue_invert_compare, 
340                          &sd);
341   if (queue->waiting_threads > 0)
342     g_cond_signal (queue->cond);
343 }
344
345 static gpointer
346 g_async_queue_pop_intern_unlocked (GAsyncQueue *queue, 
347                                    gboolean     try, 
348                                    GTimeVal    *end_time)
349 {
350   gpointer retval;
351
352   if (!g_queue_peek_tail_link (queue->queue))
353     {
354       if (try)
355         return NULL;
356       
357       if (!queue->cond)
358         queue->cond = g_cond_new ();
359
360       if (!end_time)
361         {
362           queue->waiting_threads++;
363           while (!g_queue_peek_tail_link (queue->queue))
364             g_cond_wait (queue->cond, queue->mutex);
365           queue->waiting_threads--;
366         }
367       else
368         {
369           queue->waiting_threads++;
370           while (!g_queue_peek_tail_link (queue->queue))
371             if (!g_cond_timed_wait (queue->cond, queue->mutex, end_time))
372               break;
373           queue->waiting_threads--;
374           if (!g_queue_peek_tail_link (queue->queue))
375             return NULL;
376         }
377     }
378
379   retval = g_queue_pop_tail (queue->queue);
380
381   g_assert (retval);
382
383   return retval;
384 }
385
386 /**
387  * g_async_queue_pop:
388  * @queue: a #GAsyncQueue.
389  * 
390  * Pops data from the @queue. This function blocks until data become
391  * available.
392  *
393  * Return value: data from the queue.
394  **/
395 gpointer
396 g_async_queue_pop (GAsyncQueue* queue)
397 {
398   gpointer retval;
399
400   g_return_val_if_fail (queue, NULL);
401   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
402
403   g_mutex_lock (queue->mutex);
404   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
405   g_mutex_unlock (queue->mutex);
406
407   return retval;
408 }
409
410 /**
411  * g_async_queue_pop_unlocked:
412  * @queue: a #GAsyncQueue.
413  * 
414  * Pops data from the @queue. This function blocks until data become
415  * available. This function must be called while holding the @queue's
416  * lock.
417  *
418  * Return value: data from the queue.
419  **/
420 gpointer
421 g_async_queue_pop_unlocked (GAsyncQueue* queue)
422 {
423   g_return_val_if_fail (queue, NULL);
424   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
425
426   return g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
427 }
428
429 /**
430  * g_async_queue_try_pop:
431  * @queue: a #GAsyncQueue.
432  * 
433  * Tries to pop data from the @queue. If no data is available, %NULL is
434  * returned.
435  *
436  * Return value: data from the queue or %NULL, when no data is
437  * available immediately.
438  **/
439 gpointer
440 g_async_queue_try_pop (GAsyncQueue* queue)
441 {
442   gpointer retval;
443
444   g_return_val_if_fail (queue, NULL);
445   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
446
447   g_mutex_lock (queue->mutex);
448   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
449   g_mutex_unlock (queue->mutex);
450
451   return retval;
452 }
453
454 /**
455  * g_async_queue_try_pop_unlocked:
456  * @queue: a #GAsyncQueue.
457  * 
458  * Tries to pop data from the @queue. If no data is available, %NULL is
459  * returned. This function must be called while holding the @queue's
460  * lock.
461  *
462  * Return value: data from the queue or %NULL, when no data is
463  * available immediately.
464  **/
465 gpointer
466 g_async_queue_try_pop_unlocked (GAsyncQueue* queue)
467 {
468   g_return_val_if_fail (queue, NULL);
469   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
470
471   return g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
472 }
473
474 /**
475  * g_async_queue_timed_pop:
476  * @queue: a #GAsyncQueue.
477  * @end_time: a #GTimeVal, determining the final time.
478  *
479  * Pops data from the @queue. If no data is received before @end_time,
480  * %NULL is returned.
481  *
482  * To easily calculate @end_time a combination of g_get_current_time()
483  * and g_time_val_add() can be used.
484  *
485  * Return value: data from the queue or %NULL, when no data is
486  * received before @end_time.
487  **/
488 gpointer
489 g_async_queue_timed_pop (GAsyncQueue* queue, GTimeVal *end_time)
490 {
491   gpointer retval;
492
493   g_return_val_if_fail (queue, NULL);
494   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
495
496   g_mutex_lock (queue->mutex);
497   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
498   g_mutex_unlock (queue->mutex);
499
500   return retval;  
501 }
502
503 /**
504  * g_async_queue_timed_pop_unlocked:
505  * @queue: a #GAsyncQueue.
506  * @end_time: a #GTimeVal, determining the final time.
507  *
508  * Pops data from the @queue. If no data is received before @end_time,
509  * %NULL is returned. This function must be called while holding the
510  * @queue's lock.
511  *
512  * To easily calculate @end_time a combination of g_get_current_time()
513  * and g_time_val_add() can be used.
514  *
515  * Return value: data from the queue or %NULL, when no data is
516  * received before @end_time.
517  **/
518 gpointer
519 g_async_queue_timed_pop_unlocked (GAsyncQueue* queue, GTimeVal *end_time)
520 {
521   g_return_val_if_fail (queue, NULL);
522   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
523
524   return g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
525 }
526
527 /**
528  * g_async_queue_length:
529  * @queue: a #GAsyncQueue.
530  * 
531  * Returns the length of the queue, negative values mean waiting
532  * threads, positive values mean available entries in the
533  * @queue. Actually this function returns the number of data items in
534  * the queue minus the number of waiting threads. Thus a return value
535  * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
536  * That can happen due to locking of the queue or due to
537  * scheduling.  
538  *
539  * Return value: the length of the @queue.
540  **/
541 gint
542 g_async_queue_length (GAsyncQueue* queue)
543 {
544   gint retval;
545
546   g_return_val_if_fail (queue, 0);
547   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
548
549   g_mutex_lock (queue->mutex);
550   retval = queue->queue->length - queue->waiting_threads;
551   g_mutex_unlock (queue->mutex);
552
553   return retval;
554 }
555
556 /**
557  * g_async_queue_length_unlocked:
558  * @queue: a #GAsyncQueue.
559  * 
560  * Returns the length of the queue, negative values mean waiting
561  * threads, positive values mean available entries in the
562  * @queue. Actually this function returns the number of data items in
563  * the queue minus the number of waiting threads. Thus a return value
564  * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
565  * That can happen due to locking of the queue or due to
566  * scheduling. This function must be called while holding the @queue's
567  * lock.
568  *
569  * Return value: the length of the @queue.
570  **/
571 gint
572 g_async_queue_length_unlocked (GAsyncQueue* queue)
573 {
574   g_return_val_if_fail (queue, 0);
575   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
576
577   return queue->queue->length - queue->waiting_threads;
578 }
579
580 /**
581  * g_async_queue_sort:
582  * @queue: a #GAsyncQueue
583  * @func: the #GCompareDataFunc is used to sort @queue. This
584  *     function is passed two elements of the @queue. The function
585  *     should return 0 if they are equal, a negative value if the
586  *     first element should be higher in the @queue or a positive
587  *     value if the first element should be lower in the @queue than
588  *     the second element. 
589  * @user_data: user data passed to @func
590  *
591  * Sorts @queue using @func. 
592  *
593  * This function will lock @queue before it sorts the queue and unlock
594  * it when it is finished.
595  *
596  * If you were sorting a list of priority numbers to make sure the
597  * lowest priority would be at the top of the queue, you could use:
598  * |[
599  *  gint32 id1;
600  *  gint32 id2;
601  *   
602  *  id1 = GPOINTER_TO_INT (element1);
603  *  id2 = GPOINTER_TO_INT (element2);
604  *   
605  *  return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
606  * ]|
607  *
608  * Since: 2.10
609  **/
610 void
611 g_async_queue_sort (GAsyncQueue      *queue,
612                     GCompareDataFunc  func,
613                     gpointer          user_data)
614 {
615   g_return_if_fail (queue != NULL);
616   g_return_if_fail (func != NULL);
617
618   g_mutex_lock (queue->mutex);
619   g_async_queue_sort_unlocked (queue, func, user_data);
620   g_mutex_unlock (queue->mutex);
621 }
622
623 /**
624  * g_async_queue_sort_unlocked:
625  * @queue: a #GAsyncQueue
626  * @func: the #GCompareDataFunc is used to sort @queue. This
627  *     function is passed two elements of the @queue. The function
628  *     should return 0 if they are equal, a negative value if the
629  *     first element should be higher in the @queue or a positive
630  *     value if the first element should be lower in the @queue than
631  *     the second element. 
632  * @user_data: user data passed to @func
633  *
634  * Sorts @queue using @func. 
635  *
636  * This function is called while holding the @queue's lock.
637  * 
638  * Since: 2.10
639  **/
640 void
641 g_async_queue_sort_unlocked (GAsyncQueue      *queue,
642                              GCompareDataFunc  func,
643                              gpointer          user_data)
644 {
645   SortData sd;
646
647   g_return_if_fail (queue != NULL);
648   g_return_if_fail (func != NULL);
649
650   sd.func = func;
651   sd.user_data = user_data;
652
653   g_queue_sort (queue->queue, 
654                 (GCompareDataFunc)g_async_queue_invert_compare, 
655                 &sd);
656 }
657
658 /*
659  * Private API
660  */
661
662 GMutex*
663 _g_async_queue_get_mutex (GAsyncQueue* queue)
664 {
665   g_return_val_if_fail (queue, NULL);
666   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
667
668   return queue->mutex;
669 }
670
671 #define __G_ASYNCQUEUE_C__
672 #include "galiasdef.c"