GAsyncQueue: Move private API to a private header
[platform/upstream/glib.git] / glib / gasyncqueue.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "gasyncqueue.h"
30 #include "gasyncqueueprivate.h"
31
32 #include "gmem.h"
33 #include "gqueue.h"
34 #include "gtestutils.h"
35 #include "gthread.h"
36
37
38 /**
39  * SECTION:async_queues
40  * @title: Asynchronous Queues
41  * @short_description: asynchronous communication between threads
42  *
43  * Often you need to communicate between different threads. In general
44  * it's safer not to do this by shared memory, but by explicit message
45  * passing. These messages only make sense asynchronously for
46  * multi-threaded applications though, as a synchronous operation could
47  * as well be done in the same thread.
48  *
49  * Asynchronous queues are an exception from most other GLib data
50  * structures, as they can be used simultaneously from multiple threads
51  * without explicit locking and they bring their own builtin reference
52  * counting. This is because the nature of an asynchronous queue is that
53  * it will always be used by at least 2 concurrent threads.
54  *
55  * For using an asynchronous queue you first have to create one with
56  * g_async_queue_new(). A newly-created queue will get the reference
57  * count 1. Whenever another thread is creating a new reference of (that
58  * is, pointer to) the queue, it has to increase the reference count
59  * (using g_async_queue_ref()). Also, before removing this reference,
60  * the reference count has to be decreased (using g_async_queue_unref()).
61  * After that the queue might no longer exist so you must not access
62  * it after that point.
63  *
64  * A thread, which wants to send a message to that queue simply calls
65  * g_async_queue_push() to push the message to the queue.
66  *
67  * A thread, which is expecting messages from an asynchronous queue
68  * simply calls g_async_queue_pop() for that queue. If no message is
69  * available in the queue at that point, the thread is now put to sleep
70  * until a message arrives. The message will be removed from the queue
71  * and returned. The functions g_async_queue_try_pop() and
72  * g_async_queue_timed_pop() can be used to only check for the presence
73  * of messages or to only wait a certain time for messages respectively.
74  *
75  * For almost every function there exist two variants, one that locks
76  * the queue and one that doesn't. That way you can hold the queue lock
77  * (acquire it with g_async_queue_lock() and release it with
78  * g_async_queue_unlock()) over multiple queue accessing instructions.
79  * This can be necessary to ensure the integrity of the queue, but should
80  * only be used when really necessary, as it can make your life harder
81  * if used unwisely. Normally you should only use the locking function
82  * variants (those without the suffix _unlocked)
83  */
84
85 /**
86  * GAsyncQueue:
87  *
88  * The GAsyncQueue struct is an opaque data structure, which represents
89  * an asynchronous queue. It should only be accessed through the
90  * <function>g_async_queue_*</function> functions.
91  */
92 struct _GAsyncQueue
93 {
94   GMutex mutex;
95   GCond *cond;
96   GQueue queue;
97   GDestroyNotify item_free_func;
98   guint waiting_threads;
99   gint ref_count;
100 };
101
102 typedef struct {
103   GCompareDataFunc func;
104   gpointer         user_data;
105 } SortData;
106
107 /**
108  * g_async_queue_new:
109  * 
110  * Creates a new asynchronous queue with the initial reference count of 1.
111  * 
112  * Return value: the new #GAsyncQueue.
113  **/
114 GAsyncQueue*
115 g_async_queue_new (void)
116 {
117   GAsyncQueue* retval = g_new (GAsyncQueue, 1);
118   g_mutex_init (&retval->mutex);
119   retval->cond = NULL;
120   g_queue_init (&retval->queue);
121   retval->waiting_threads = 0;
122   retval->ref_count = 1;
123   retval->item_free_func = NULL;
124   return retval;
125 }
126
127 /**
128  * g_async_queue_new_full:
129  * @item_free_func: function to free queue elements
130  * 
131  * Creates a new asynchronous queue with an initial reference count of 1 and
132  * sets up a destroy notify function that is used to free any remaining
133  * queue items when the queue is destroyed after the final unref.
134  *
135  * Return value: the new #GAsyncQueue.
136  *
137  * Since: 2.16
138  **/
139 GAsyncQueue*
140 g_async_queue_new_full (GDestroyNotify item_free_func)
141 {
142   GAsyncQueue *async_queue = g_async_queue_new ();
143   async_queue->item_free_func = item_free_func;
144   return async_queue;
145 }
146
147 /**
148  * g_async_queue_ref:
149  * @queue: a #GAsyncQueue.
150  *
151  * Increases the reference count of the asynchronous @queue by 1. You
152  * do not need to hold the lock to call this function.
153  *
154  * Returns: the @queue that was passed in (since 2.6)
155  **/
156 GAsyncQueue *
157 g_async_queue_ref (GAsyncQueue *queue)
158 {
159   g_return_val_if_fail (queue, NULL);
160   
161   g_atomic_int_inc (&queue->ref_count);
162
163   return queue;
164 }
165
166 /**
167  * g_async_queue_ref_unlocked:
168  * @queue: a #GAsyncQueue.
169  * 
170  * Increases the reference count of the asynchronous @queue by 1.
171  *
172  * @Deprecated: Since 2.8, reference counting is done atomically
173  * so g_async_queue_ref() can be used regardless of the @queue's
174  * lock.
175  **/
176 void 
177 g_async_queue_ref_unlocked (GAsyncQueue *queue)
178 {
179   g_return_if_fail (queue);
180   
181   g_atomic_int_inc (&queue->ref_count);
182 }
183
184 /**
185  * g_async_queue_unref_and_unlock:
186  * @queue: a #GAsyncQueue.
187  * 
188  * Decreases the reference count of the asynchronous @queue by 1 and
189  * releases the lock. This function must be called while holding the
190  * @queue's lock. If the reference count went to 0, the @queue will be
191  * destroyed and the memory allocated will be freed.
192  *
193  * @Deprecated: Since 2.8, reference counting is done atomically
194  * so g_async_queue_unref() can be used regardless of the @queue's
195  * lock.
196  **/
197 void 
198 g_async_queue_unref_and_unlock (GAsyncQueue *queue)
199 {
200   g_return_if_fail (queue);
201
202   g_mutex_unlock (&queue->mutex);
203   g_async_queue_unref (queue);
204 }
205
206 /**
207  * g_async_queue_unref:
208  * @queue: a #GAsyncQueue.
209  * 
210  * Decreases the reference count of the asynchronous @queue by 1. If
211  * the reference count went to 0, the @queue will be destroyed and the
212  * memory allocated will be freed. So you are not allowed to use the
213  * @queue afterwards, as it might have disappeared. You do not need to
214  * hold the lock to call this function.
215  **/
216 void 
217 g_async_queue_unref (GAsyncQueue *queue)
218 {
219   g_return_if_fail (queue);
220   
221   if (g_atomic_int_dec_and_test (&queue->ref_count))
222     {
223       g_return_if_fail (queue->waiting_threads == 0);
224       g_mutex_clear (&queue->mutex);
225       if (queue->cond)
226         g_cond_free (queue->cond);
227       if (queue->item_free_func)
228         g_queue_foreach (&queue->queue, (GFunc) queue->item_free_func, NULL);
229       g_queue_clear (&queue->queue);
230       g_free (queue);
231     }
232 }
233
234 /**
235  * g_async_queue_lock:
236  * @queue: a #GAsyncQueue.
237  * 
238  * Acquires the @queue's lock. After that you can only call the
239  * <function>g_async_queue_*_unlocked()</function> function variants on that
240  * @queue. Otherwise it will deadlock.
241  **/
242 void
243 g_async_queue_lock (GAsyncQueue *queue)
244 {
245   g_return_if_fail (queue);
246
247   g_mutex_lock (&queue->mutex);
248 }
249
250 /**
251  * g_async_queue_unlock:
252  * @queue: a #GAsyncQueue.
253  * 
254  * Releases the queue's lock.
255  **/
256 void 
257 g_async_queue_unlock (GAsyncQueue *queue)
258 {
259   g_return_if_fail (queue);
260
261   g_mutex_unlock (&queue->mutex);
262 }
263
264 /**
265  * g_async_queue_push:
266  * @queue: a #GAsyncQueue.
267  * @data: @data to push into the @queue.
268  *
269  * Pushes the @data into the @queue. @data must not be %NULL.
270  **/
271 void
272 g_async_queue_push (GAsyncQueue* queue, gpointer data)
273 {
274   g_return_if_fail (queue);
275   g_return_if_fail (data);
276
277   g_mutex_lock (&queue->mutex);
278   g_async_queue_push_unlocked (queue, data);
279   g_mutex_unlock (&queue->mutex);
280 }
281
282 /**
283  * g_async_queue_push_unlocked:
284  * @queue: a #GAsyncQueue.
285  * @data: @data to push into the @queue.
286  * 
287  * Pushes the @data into the @queue. @data must not be %NULL. This
288  * function must be called while holding the @queue's lock.
289  **/
290 void
291 g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
292 {
293   g_return_if_fail (queue);
294   g_return_if_fail (data);
295
296   g_queue_push_head (&queue->queue, data);
297   if (queue->waiting_threads > 0)
298     g_cond_signal (queue->cond);
299 }
300
301 /**
302  * g_async_queue_push_sorted:
303  * @queue: a #GAsyncQueue
304  * @data: the @data to push into the @queue
305  * @func: the #GCompareDataFunc is used to sort @queue. This function
306  *     is passed two elements of the @queue. The function should return
307  *     0 if they are equal, a negative value if the first element
308  *     should be higher in the @queue or a positive value if the first
309  *     element should be lower in the @queue than the second element.
310  * @user_data: user data passed to @func.
311  * 
312  * Inserts @data into @queue using @func to determine the new
313  * position. 
314  * 
315  * This function requires that the @queue is sorted before pushing on
316  * new elements.
317  * 
318  * This function will lock @queue before it sorts the queue and unlock
319  * it when it is finished.
320  * 
321  * For an example of @func see g_async_queue_sort(). 
322  *
323  * Since: 2.10
324  **/
325 void
326 g_async_queue_push_sorted (GAsyncQueue      *queue,
327                            gpointer          data,
328                            GCompareDataFunc  func,
329                            gpointer          user_data)
330 {
331   g_return_if_fail (queue != NULL);
332
333   g_mutex_lock (&queue->mutex);
334   g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
335   g_mutex_unlock (&queue->mutex);
336 }
337
338 static gint 
339 g_async_queue_invert_compare (gpointer  v1, 
340                               gpointer  v2, 
341                               SortData *sd)
342 {
343   return -sd->func (v1, v2, sd->user_data);
344 }
345
346 /**
347  * g_async_queue_push_sorted_unlocked:
348  * @queue: a #GAsyncQueue
349  * @data: the @data to push into the @queue
350  * @func: the #GCompareDataFunc is used to sort @queue. This function
351  *     is passed two elements of the @queue. The function should return
352  *     0 if they are equal, a negative value if the first element
353  *     should be higher in the @queue or a positive value if the first
354  *     element should be lower in the @queue than the second element.
355  * @user_data: user data passed to @func.
356  * 
357  * Inserts @data into @queue using @func to determine the new
358  * position.
359  * 
360  * This function requires that the @queue is sorted before pushing on
361  * new elements.
362  * 
363  * This function is called while holding the @queue's lock.
364  * 
365  * For an example of @func see g_async_queue_sort(). 
366  *
367  * Since: 2.10
368  **/
369 void
370 g_async_queue_push_sorted_unlocked (GAsyncQueue      *queue,
371                                     gpointer          data,
372                                     GCompareDataFunc  func,
373                                     gpointer          user_data)
374 {
375   SortData sd;
376   
377   g_return_if_fail (queue != NULL);
378
379   sd.func = func;
380   sd.user_data = user_data;
381
382   g_queue_insert_sorted (&queue->queue,
383                          data, 
384                          (GCompareDataFunc)g_async_queue_invert_compare, 
385                          &sd);
386   if (queue->waiting_threads > 0)
387     g_cond_signal (queue->cond);
388 }
389
390 static gpointer
391 g_async_queue_pop_intern_unlocked (GAsyncQueue *queue, 
392                                    gboolean     try, 
393                                    GTimeVal    *end_time)
394 {
395   gpointer retval;
396
397   if (!g_queue_peek_tail_link (&queue->queue))
398     {
399       if (try)
400         return NULL;
401       
402       if (!queue->cond)
403         queue->cond = g_cond_new ();
404
405       if (!end_time)
406         {
407           queue->waiting_threads++;
408           while (!g_queue_peek_tail_link (&queue->queue))
409             g_cond_wait (queue->cond, &queue->mutex);
410           queue->waiting_threads--;
411         }
412       else
413         {
414           queue->waiting_threads++;
415           while (!g_queue_peek_tail_link (&queue->queue))
416             if (!g_cond_timed_wait (queue->cond, &queue->mutex, end_time))
417               break;
418           queue->waiting_threads--;
419           if (!g_queue_peek_tail_link (&queue->queue))
420             return NULL;
421         }
422     }
423
424   retval = g_queue_pop_tail (&queue->queue);
425
426   g_assert (retval);
427
428   return retval;
429 }
430
431 /**
432  * g_async_queue_pop:
433  * @queue: a #GAsyncQueue.
434  * 
435  * Pops data from the @queue. This function blocks until data become
436  * available.
437  *
438  * Return value: data from the queue.
439  **/
440 gpointer
441 g_async_queue_pop (GAsyncQueue* queue)
442 {
443   gpointer retval;
444
445   g_return_val_if_fail (queue, NULL);
446
447   g_mutex_lock (&queue->mutex);
448   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
449   g_mutex_unlock (&queue->mutex);
450
451   return retval;
452 }
453
454 /**
455  * g_async_queue_pop_unlocked:
456  * @queue: a #GAsyncQueue.
457  * 
458  * Pops data from the @queue. This function blocks until data become
459  * available. This function must be called while holding the @queue's
460  * lock.
461  *
462  * Return value: data from the queue.
463  **/
464 gpointer
465 g_async_queue_pop_unlocked (GAsyncQueue* queue)
466 {
467   g_return_val_if_fail (queue, NULL);
468
469   return g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
470 }
471
472 /**
473  * g_async_queue_try_pop:
474  * @queue: a #GAsyncQueue.
475  * 
476  * Tries to pop data from the @queue. If no data is available, %NULL is
477  * returned.
478  *
479  * Return value: data from the queue or %NULL, when no data is
480  * available immediately.
481  **/
482 gpointer
483 g_async_queue_try_pop (GAsyncQueue* queue)
484 {
485   gpointer retval;
486
487   g_return_val_if_fail (queue, NULL);
488
489   g_mutex_lock (&queue->mutex);
490   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
491   g_mutex_unlock (&queue->mutex);
492
493   return retval;
494 }
495
496 /**
497  * g_async_queue_try_pop_unlocked:
498  * @queue: a #GAsyncQueue.
499  * 
500  * Tries to pop data from the @queue. If no data is available, %NULL is
501  * returned. This function must be called while holding the @queue's
502  * lock.
503  *
504  * Return value: data from the queue or %NULL, when no data is
505  * available immediately.
506  **/
507 gpointer
508 g_async_queue_try_pop_unlocked (GAsyncQueue* queue)
509 {
510   g_return_val_if_fail (queue, NULL);
511
512   return g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
513 }
514
515 /**
516  * g_async_queue_timed_pop:
517  * @queue: a #GAsyncQueue.
518  * @end_time: a #GTimeVal, determining the final time.
519  *
520  * Pops data from the @queue. If no data is received before @end_time,
521  * %NULL is returned.
522  *
523  * To easily calculate @end_time a combination of g_get_current_time()
524  * and g_time_val_add() can be used.
525  *
526  * Return value: data from the queue or %NULL, when no data is
527  * received before @end_time.
528  **/
529 gpointer
530 g_async_queue_timed_pop (GAsyncQueue* queue, GTimeVal *end_time)
531 {
532   gpointer retval;
533
534   g_return_val_if_fail (queue, NULL);
535
536   g_mutex_lock (&queue->mutex);
537   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
538   g_mutex_unlock (&queue->mutex);
539
540   return retval;  
541 }
542
543 /**
544  * g_async_queue_timed_pop_unlocked:
545  * @queue: a #GAsyncQueue.
546  * @end_time: a #GTimeVal, determining the final time.
547  *
548  * Pops data from the @queue. If no data is received before @end_time,
549  * %NULL is returned. This function must be called while holding the
550  * @queue's lock.
551  *
552  * To easily calculate @end_time a combination of g_get_current_time()
553  * and g_time_val_add() can be used.
554  *
555  * Return value: data from the queue or %NULL, when no data is
556  * received before @end_time.
557  **/
558 gpointer
559 g_async_queue_timed_pop_unlocked (GAsyncQueue* queue, GTimeVal *end_time)
560 {
561   g_return_val_if_fail (queue, NULL);
562
563   return g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
564 }
565
566 /**
567  * g_async_queue_length:
568  * @queue: a #GAsyncQueue.
569  * 
570  * Returns the length of the queue, negative values mean waiting
571  * threads, positive values mean available entries in the
572  * @queue. Actually this function returns the number of data items in
573  * the queue minus the number of waiting threads. Thus a return value
574  * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
575  * That can happen due to locking of the queue or due to
576  * scheduling.  
577  *
578  * Return value: the length of the @queue.
579  **/
580 gint
581 g_async_queue_length (GAsyncQueue* queue)
582 {
583   gint retval;
584
585   g_return_val_if_fail (queue, 0);
586
587   g_mutex_lock (&queue->mutex);
588   retval = queue->queue.length - queue->waiting_threads;
589   g_mutex_unlock (&queue->mutex);
590
591   return retval;
592 }
593
594 /**
595  * g_async_queue_length_unlocked:
596  * @queue: a #GAsyncQueue.
597  * 
598  * Returns the length of the queue, negative values mean waiting
599  * threads, positive values mean available entries in the
600  * @queue. Actually this function returns the number of data items in
601  * the queue minus the number of waiting threads. Thus a return value
602  * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
603  * That can happen due to locking of the queue or due to
604  * scheduling. This function must be called while holding the @queue's
605  * lock.
606  *
607  * Return value: the length of the @queue.
608  **/
609 gint
610 g_async_queue_length_unlocked (GAsyncQueue* queue)
611 {
612   g_return_val_if_fail (queue, 0);
613
614   return queue->queue.length - queue->waiting_threads;
615 }
616
617 /**
618  * g_async_queue_sort:
619  * @queue: a #GAsyncQueue
620  * @func: the #GCompareDataFunc is used to sort @queue. This
621  *     function is passed two elements of the @queue. The function
622  *     should return 0 if they are equal, a negative value if the
623  *     first element should be higher in the @queue or a positive
624  *     value if the first element should be lower in the @queue than
625  *     the second element. 
626  * @user_data: user data passed to @func
627  *
628  * Sorts @queue using @func. 
629  *
630  * This function will lock @queue before it sorts the queue and unlock
631  * it when it is finished.
632  *
633  * If you were sorting a list of priority numbers to make sure the
634  * lowest priority would be at the top of the queue, you could use:
635  * |[
636  *  gint32 id1;
637  *  gint32 id2;
638  *   
639  *  id1 = GPOINTER_TO_INT (element1);
640  *  id2 = GPOINTER_TO_INT (element2);
641  *   
642  *  return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
643  * ]|
644  *
645  * Since: 2.10
646  **/
647 void
648 g_async_queue_sort (GAsyncQueue      *queue,
649                     GCompareDataFunc  func,
650                     gpointer          user_data)
651 {
652   g_return_if_fail (queue != NULL);
653   g_return_if_fail (func != NULL);
654
655   g_mutex_lock (&queue->mutex);
656   g_async_queue_sort_unlocked (queue, func, user_data);
657   g_mutex_unlock (&queue->mutex);
658 }
659
660 /**
661  * g_async_queue_sort_unlocked:
662  * @queue: a #GAsyncQueue
663  * @func: the #GCompareDataFunc is used to sort @queue. This
664  *     function is passed two elements of the @queue. The function
665  *     should return 0 if they are equal, a negative value if the
666  *     first element should be higher in the @queue or a positive
667  *     value if the first element should be lower in the @queue than
668  *     the second element. 
669  * @user_data: user data passed to @func
670  *
671  * Sorts @queue using @func. 
672  *
673  * This function is called while holding the @queue's lock.
674  * 
675  * Since: 2.10
676  **/
677 void
678 g_async_queue_sort_unlocked (GAsyncQueue      *queue,
679                              GCompareDataFunc  func,
680                              gpointer          user_data)
681 {
682   SortData sd;
683
684   g_return_if_fail (queue != NULL);
685   g_return_if_fail (func != NULL);
686
687   sd.func = func;
688   sd.user_data = user_data;
689
690   g_queue_sort (&queue->queue,
691                 (GCompareDataFunc)g_async_queue_invert_compare, 
692                 &sd);
693 }
694
695 /*
696  * Private API
697  */
698
699 GMutex*
700 _g_async_queue_get_mutex (GAsyncQueue* queue)
701 {
702   g_return_val_if_fail (queue, NULL);
703
704   return &queue->mutex;
705 }