GAsyncQueue: simplify an internal function
[platform/upstream/glib.git] / glib / gasyncqueue.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: asynchronous queue implementation, based on GQueue.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "gasyncqueue.h"
30 #include "gasyncqueueprivate.h"
31
32 #include "gmem.h"
33 #include "gqueue.h"
34 #include "gtestutils.h"
35 #include "gthread.h"
36
37
38 /**
39  * SECTION:async_queues
40  * @title: Asynchronous Queues
41  * @short_description: asynchronous communication between threads
42  * @see_also: #GThreadPool
43  *
44  * Often you need to communicate between different threads. In general
45  * it's safer not to do this by shared memory, but by explicit message
46  * passing. These messages only make sense asynchronously for
47  * multi-threaded applications though, as a synchronous operation could
48  * as well be done in the same thread.
49  *
50  * Asynchronous queues are an exception from most other GLib data
51  * structures, as they can be used simultaneously from multiple threads
52  * without explicit locking and they bring their own builtin reference
53  * counting. This is because the nature of an asynchronous queue is that
54  * it will always be used by at least 2 concurrent threads.
55  *
56  * For using an asynchronous queue you first have to create one with
57  * g_async_queue_new(). #GAsyncQueue structs are reference counted,
58  * use g_async_queue_ref() and g_async_queue_unref() to manage your
59  * references.
60  *
61  * A thread which wants to send a message to that queue simply calls
62  * g_async_queue_push() to push the message to the queue.
63  *
64  * A thread which is expecting messages from an asynchronous queue
65  * simply calls g_async_queue_pop() for that queue. If no message is
66  * available in the queue at that point, the thread is now put to sleep
67  * until a message arrives. The message will be removed from the queue
68  * and returned. The functions g_async_queue_try_pop() and
69  * g_async_queue_timed_pop() can be used to only check for the presence
70  * of messages or to only wait a certain time for messages respectively.
71  *
72  * For almost every function there exist two variants, one that locks
73  * the queue and one that doesn't. That way you can hold the queue lock
74  * (acquire it with g_async_queue_lock() and release it with
75  * g_async_queue_unlock()) over multiple queue accessing instructions.
76  * This can be necessary to ensure the integrity of the queue, but should
77  * only be used when really necessary, as it can make your life harder
78  * if used unwisely. Normally you should only use the locking function
79  * variants (those without the _unlocked suffix).
80  *
81  * In many cases, it may be more convenient to use #GThreadPool when
82  * you need to distribute work to a set of worker threads instead of
83  * using #GAsyncQueue manually. #GThreadPool uses a GAsyncQueue
84  * internally.
85  */
86
87 /**
88  * GAsyncQueue:
89  *
90  * The GAsyncQueue struct is an opaque data structure which represents
91  * an asynchronous queue. It should only be accessed through the
92  * <function>g_async_queue_*</function> functions.
93  */
94 struct _GAsyncQueue
95 {
96   GMutex mutex;
97   GCond cond;
98   GQueue queue;
99   GDestroyNotify item_free_func;
100   guint waiting_threads;
101   gint ref_count;
102 };
103
104 typedef struct
105 {
106   GCompareDataFunc func;
107   gpointer         user_data;
108 } SortData;
109
110 /**
111  * g_async_queue_new:
112  *
113  * Creates a new asynchronous queue.
114  *
115  * Return value: a new #GAsyncQueue. Free with g_async_queue_unref()
116  */
117 GAsyncQueue *
118 g_async_queue_new (void)
119 {
120   return g_async_queue_new_full (NULL);
121 }
122
123 /**
124  * g_async_queue_new_full:
125  * @item_free_func: function to free queue elements
126  *
127  * Creates a new asynchronous queue and sets up a destroy notify
128  * function that is used to free any remaining queue items when
129  * the queue is destroyed after the final unref.
130  *
131  * Return value: a new #GAsyncQueue. Free with g_async_queue_unref()
132  *
133  * Since: 2.16
134  */
135 GAsyncQueue *
136 g_async_queue_new_full (GDestroyNotify item_free_func)
137 {
138   GAsyncQueue *queue;
139
140   queue = g_new (GAsyncQueue, 1);
141   g_mutex_init (&queue->mutex);
142   g_cond_init (&queue->cond);
143   g_queue_init (&queue->queue);
144   queue->waiting_threads = 0;
145   queue->ref_count = 1;
146   queue->item_free_func = NULL;
147
148   return queue;
149 }
150
151 /**
152  * g_async_queue_ref:
153  * @queue: a #GAsyncQueue
154  *
155  * Increases the reference count of the asynchronous @queue by 1.
156  * You do not need to hold the lock to call this function.
157  *
158  * Returns: the @queue that was passed in (since 2.6)
159  */
160 GAsyncQueue *
161 g_async_queue_ref (GAsyncQueue *queue)
162 {
163   g_return_val_if_fail (queue, NULL);
164
165   g_atomic_int_inc (&queue->ref_count);
166
167   return queue;
168 }
169
170 /**
171  * g_async_queue_ref_unlocked:
172  * @queue: a #GAsyncQueue
173  *
174  * Increases the reference count of the asynchronous @queue by 1.
175  *
176  * @Deprecated: Since 2.8, reference counting is done atomically
177  * so g_async_queue_ref() can be used regardless of the @queue's
178  * lock.
179  */
180 void
181 g_async_queue_ref_unlocked (GAsyncQueue *queue)
182 {
183   g_return_if_fail (queue);
184
185   g_atomic_int_inc (&queue->ref_count);
186 }
187
188 /**
189  * g_async_queue_unref_and_unlock:
190  * @queue: a #GAsyncQueue
191  *
192  * Decreases the reference count of the asynchronous @queue by 1
193  * and releases the lock. This function must be called while holding
194  * the @queue's lock. If the reference count went to 0, the @queue
195  * will be destroyed and the memory allocated will be freed.
196  *
197  * @Deprecated: Since 2.8, reference counting is done atomically
198  * so g_async_queue_unref() can be used regardless of the @queue's
199  * lock.
200  */
201 void
202 g_async_queue_unref_and_unlock (GAsyncQueue *queue)
203 {
204   g_return_if_fail (queue);
205
206   g_mutex_unlock (&queue->mutex);
207   g_async_queue_unref (queue);
208 }
209
210 /**
211  * g_async_queue_unref:
212  * @queue: a #GAsyncQueue.
213  *
214  * Decreases the reference count of the asynchronous @queue by 1.
215  *
216  * If the reference count went to 0, the @queue will be destroyed
217  * and the memory allocated will be freed. So you are not allowed
218  * to use the @queue afterwards, as it might have disappeared.
219  * You do not need to hold the lock to call this function.
220  */
221 void
222 g_async_queue_unref (GAsyncQueue *queue)
223 {
224   g_return_if_fail (queue);
225
226   if (g_atomic_int_dec_and_test (&queue->ref_count))
227     {
228       g_return_if_fail (queue->waiting_threads == 0);
229       g_mutex_clear (&queue->mutex);
230       g_cond_clear (&queue->cond);
231       if (queue->item_free_func)
232         g_queue_foreach (&queue->queue, (GFunc) queue->item_free_func, NULL);
233       g_queue_clear (&queue->queue);
234       g_free (queue);
235     }
236 }
237
238 /**
239  * g_async_queue_lock:
240  * @queue: a #GAsyncQueue
241  *
242  * Acquires the @queue's lock. If another thread is already
243  * holding the lock, this call will block until the lock
244  * becomes available.
245  *
246  * Call g_async_queue_unlock() to drop the lock again.
247  *
248  * While holding the lock, you can only call the
249  * <function>g_async_queue_*_unlocked()</function> functions
250  * on @queue. Otherwise, deadlock may occur.
251  */
252 void
253 g_async_queue_lock (GAsyncQueue *queue)
254 {
255   g_return_if_fail (queue);
256
257   g_mutex_lock (&queue->mutex);
258 }
259
260 /**
261  * g_async_queue_unlock:
262  * @queue: a #GAsyncQueue
263  *
264  * Releases the queue's lock.
265  *
266  * Calling this function when you have not acquired
267  * the with g_async_queue_lock() leads to undefined
268  * behaviour.
269  */
270 void
271 g_async_queue_unlock (GAsyncQueue *queue)
272 {
273   g_return_if_fail (queue);
274
275   g_mutex_unlock (&queue->mutex);
276 }
277
278 /**
279  * g_async_queue_push:
280  * @queue: a #GAsyncQueue
281  * @data: @data to push into the @queue
282  *
283  * Pushes the @data into the @queue. @data must not be %NULL.
284  */
285 void
286 g_async_queue_push (GAsyncQueue *queue,
287                     gpointer     data)
288 {
289   g_return_if_fail (queue);
290   g_return_if_fail (data);
291
292   g_mutex_lock (&queue->mutex);
293   g_async_queue_push_unlocked (queue, data);
294   g_mutex_unlock (&queue->mutex);
295 }
296
297 /**
298  * g_async_queue_push_unlocked:
299  * @queue: a #GAsyncQueue
300  * @data: @data to push into the @queue
301  *
302  * Pushes the @data into the @queue. @data must not be %NULL.
303  *
304  * This function must be called while holding the @queue's lock.
305  */
306 void
307 g_async_queue_push_unlocked (GAsyncQueue *queue,
308                              gpointer     data)
309 {
310   g_return_if_fail (queue);
311   g_return_if_fail (data);
312
313   g_queue_push_head (&queue->queue, data);
314   if (queue->waiting_threads > 0)
315     g_cond_signal (&queue->cond);
316 }
317
318 /**
319  * g_async_queue_push_sorted:
320  * @queue: a #GAsyncQueue
321  * @data: the @data to push into the @queue
322  * @func: the #GCompareDataFunc is used to sort @queue
323  * @user_data: user data passed to @func.
324  *
325  * Inserts @data into @queue using @func to determine the new
326  * position.
327  *
328  * This function requires that the @queue is sorted before pushing on
329  * new elements, see g_async_queue_sort().
330  *
331  * This function will lock @queue before it sorts the queue and unlock
332  * it when it is finished.
333  *
334  * For an example of @func see g_async_queue_sort().
335  *
336  * Since: 2.10
337  */
338 void
339 g_async_queue_push_sorted (GAsyncQueue      *queue,
340                            gpointer          data,
341                            GCompareDataFunc  func,
342                            gpointer          user_data)
343 {
344   g_return_if_fail (queue != NULL);
345
346   g_mutex_lock (&queue->mutex);
347   g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
348   g_mutex_unlock (&queue->mutex);
349 }
350
351 static gint
352 g_async_queue_invert_compare (gpointer  v1,
353                               gpointer  v2,
354                               SortData *sd)
355 {
356   return -sd->func (v1, v2, sd->user_data);
357 }
358
359 /**
360  * g_async_queue_push_sorted_unlocked:
361  * @queue: a #GAsyncQueue
362  * @data: the @data to push into the @queue
363  * @func: the #GCompareDataFunc is used to sort @queue
364  * @user_data: user data passed to @func.
365  *
366  * Inserts @data into @queue using @func to determine the new
367  * position.
368  *
369  * The sort function @func is passed two elements of the @queue.
370  * It should return 0 if they are equal, a negative value if the
371  * first element should be higher in the @queue or a positive value
372  * if the first element should be lower in the @queue than the second
373  * element.
374  *
375  * This function requires that the @queue is sorted before pushing on
376  * new elements, see g_async_queue_sort().
377  *
378  * This function must be called while holding the @queue's lock.
379  *
380  * For an example of @func see g_async_queue_sort().
381  *
382  * Since: 2.10
383  */
384 void
385 g_async_queue_push_sorted_unlocked (GAsyncQueue      *queue,
386                                     gpointer          data,
387                                     GCompareDataFunc  func,
388                                     gpointer          user_data)
389 {
390   SortData sd;
391
392   g_return_if_fail (queue != NULL);
393
394   sd.func = func;
395   sd.user_data = user_data;
396
397   g_queue_insert_sorted (&queue->queue,
398                          data,
399                          (GCompareDataFunc)g_async_queue_invert_compare,
400                          &sd);
401   if (queue->waiting_threads > 0)
402     g_cond_signal (&queue->cond);
403 }
404
405 static gpointer
406 g_async_queue_pop_intern_unlocked (GAsyncQueue *queue,
407                                    gboolean     wait,
408                                    GTimeVal    *end_time)
409 {
410   gpointer retval;
411
412   if (!g_queue_peek_tail_link (&queue->queue) && wait)
413     {
414       queue->waiting_threads++;
415       while (!g_queue_peek_tail_link (&queue->queue))
416         {
417           if (!g_cond_timed_wait (&queue->cond, &queue->mutex, end_time))
418             break;
419         }
420       queue->waiting_threads--;
421     }
422
423   retval = g_queue_pop_tail (&queue->queue);
424
425   g_assert (retval || !wait || end_time);
426
427   return retval;
428 }
429
430 /**
431  * g_async_queue_pop:
432  * @queue: a #GAsyncQueue
433  *
434  * Pops data from the @queue. If @queue is empty, this function
435  * blocks until data becomes available.
436  *
437  * Return value: data from the queue
438  */
439 gpointer
440 g_async_queue_pop (GAsyncQueue *queue)
441 {
442   gpointer retval;
443
444   g_return_val_if_fail (queue, NULL);
445
446   g_mutex_lock (&queue->mutex);
447   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
448   g_mutex_unlock (&queue->mutex);
449
450   return retval;
451 }
452
453 /**
454  * g_async_queue_pop_unlocked:
455  * @queue: a #GAsyncQueue
456  *
457  * Pops data from the @queue. If @queue is empty, this function
458  * blocks until data becomes available.
459  *
460  * This function must be called while holding the @queue's lock.
461  *
462  * Return value: data from the queue.
463  */
464 gpointer
465 g_async_queue_pop_unlocked (GAsyncQueue *queue)
466 {
467   g_return_val_if_fail (queue, NULL);
468
469   return g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
470 }
471
472 /**
473  * g_async_queue_try_pop:
474  * @queue: a #GAsyncQueue
475  *
476  * Tries to pop data from the @queue. If no data is available,
477  * %NULL is returned.
478  *
479  * Return value: data from the queue or %NULL, when no data is
480  *     available immediately.
481  */
482 gpointer
483 g_async_queue_try_pop (GAsyncQueue *queue)
484 {
485   gpointer retval;
486
487   g_return_val_if_fail (queue, NULL);
488
489   g_mutex_lock (&queue->mutex);
490   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
491   g_mutex_unlock (&queue->mutex);
492
493   return retval;
494 }
495
496 /**
497  * g_async_queue_try_pop_unlocked:
498  * @queue: a #GAsyncQueue
499  *
500  * Tries to pop data from the @queue. If no data is available,
501  * %NULL is returned.
502  *
503  * This function must be called while holding the @queue's lock.
504  *
505  * Return value: data from the queue or %NULL, when no data is
506  *     available immediately.
507  */
508 gpointer
509 g_async_queue_try_pop_unlocked (GAsyncQueue *queue)
510 {
511   g_return_val_if_fail (queue, NULL);
512
513   return g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
514 }
515
516 /**
517  * g_async_queue_timed_pop:
518  * @queue: a #GAsyncQueue
519  * @end_time: a #GTimeVal, determining the final time
520  *
521  * Pops data from the @queue. If the queue is empty, blocks until
522  * @end_time or until data becomes available.
523  *
524  * If no data is received before @end_time, %NULL is returned.
525  *
526  * To easily calculate @end_time, a combination of g_get_current_time()
527  * and g_time_val_add() can be used.
528  *
529  * Return value: data from the queue or %NULL, when no data is
530  *     received before @end_time.
531  */
532 gpointer
533 g_async_queue_timed_pop (GAsyncQueue *queue,
534                          GTimeVal    *end_time)
535 {
536   gpointer retval;
537
538   g_return_val_if_fail (queue, NULL);
539
540   g_mutex_lock (&queue->mutex);
541   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, end_time);
542   g_mutex_unlock (&queue->mutex);
543
544   return retval;
545 }
546
547 /**
548  * g_async_queue_timed_pop_unlocked:
549  * @queue: a #GAsyncQueue
550  * @end_time: a #GTimeVal, determining the final time
551  *
552  * Pops data from the @queue. If the queue is empty, blocks until
553  * @end_time or until data becomes available.
554  *
555  * If no data is received before @end_time, %NULL is returned.
556  *
557  * To easily calculate @end_time, a combination of g_get_current_time()
558  * and g_time_val_add() can be used.
559  *
560  * This function must be called while holding the @queue's lock.
561  *
562  * Return value: data from the queue or %NULL, when no data is
563  *     received before @end_time.
564  */
565 gpointer
566 g_async_queue_timed_pop_unlocked (GAsyncQueue *queue,
567                                   GTimeVal    *end_time)
568 {
569   g_return_val_if_fail (queue, NULL);
570
571   return g_async_queue_pop_intern_unlocked (queue, TRUE, end_time);
572 }
573
574 /**
575  * g_async_queue_length:
576  * @queue: a #GAsyncQueue.
577  *
578  * Returns the length of the queue.
579  *
580  * Actually this function returns the number of data items in
581  * the queue minus the number of waiting threads, so a negative
582  * value means waiting threads, and a positive value means available
583  * entries in the @queue. A return value of 0 could mean n entries
584  * in the queue and n threads waiting. This can happen due to locking
585  * of the queue or due to scheduling.
586  *
587  * Return value: the length of the @queue
588  */
589 gint
590 g_async_queue_length (GAsyncQueue *queue)
591 {
592   gint retval;
593
594   g_return_val_if_fail (queue, 0);
595
596   g_mutex_lock (&queue->mutex);
597   retval = queue->queue.length - queue->waiting_threads;
598   g_mutex_unlock (&queue->mutex);
599
600   return retval;
601 }
602
603 /**
604  * g_async_queue_length_unlocked:
605  * @queue: a #GAsyncQueue
606  *
607  * Returns the length of the queue.
608  *
609  * Actually this function returns the number of data items in
610  * the queue minus the number of waiting threads, so a negative
611  * value means waiting threads, and a positive value means available
612  * entries in the @queue. A return value of 0 could mean n entries
613  * in the queue and n threads waiting. This can happen due to locking
614  * of the queue or due to scheduling.
615  *
616  * This function must be called while holding the @queue's lock.
617  *
618  * Return value: the length of the @queue.
619  */
620 gint
621 g_async_queue_length_unlocked (GAsyncQueue *queue)
622 {
623   g_return_val_if_fail (queue, 0);
624
625   return queue->queue.length - queue->waiting_threads;
626 }
627
628 /**
629  * g_async_queue_sort:
630  * @queue: a #GAsyncQueue
631  * @func: the #GCompareDataFunc is used to sort @queue
632  * @user_data: user data passed to @func
633  *
634  * Sorts @queue using @func.
635  *
636  * The sort function @func is passed two elements of the @queue.
637  * It should return 0 if they are equal, a negative value if the
638  * first element should be higher in the @queue or a positive value
639  * if the first element should be lower in the @queue than the second
640  * element.
641  *
642  * This function will lock @queue before it sorts the queue and unlock
643  * it when it is finished.
644  *
645  * If you were sorting a list of priority numbers to make sure the
646  * lowest priority would be at the top of the queue, you could use:
647  * |[
648  *  gint32 id1;
649  *  gint32 id2;
650  *
651  *  id1 = GPOINTER_TO_INT (element1);
652  *  id2 = GPOINTER_TO_INT (element2);
653  *
654  *  return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
655  * ]|
656  *
657  * Since: 2.10
658  */
659 void
660 g_async_queue_sort (GAsyncQueue      *queue,
661                     GCompareDataFunc  func,
662                     gpointer          user_data)
663 {
664   g_return_if_fail (queue != NULL);
665   g_return_if_fail (func != NULL);
666
667   g_mutex_lock (&queue->mutex);
668   g_async_queue_sort_unlocked (queue, func, user_data);
669   g_mutex_unlock (&queue->mutex);
670 }
671
672 /**
673  * g_async_queue_sort_unlocked:
674  * @queue: a #GAsyncQueue
675  * @func: the #GCompareDataFunc is used to sort @queue
676  * @user_data: user data passed to @func
677  *
678  * Sorts @queue using @func.
679  *
680  * The sort function @func is passed two elements of the @queue.
681  * It should return 0 if they are equal, a negative value if the
682  * first element should be higher in the @queue or a positive value
683  * if the first element should be lower in the @queue than the second
684  * element.
685  *
686  * This function must be called while holding the @queue's lock.
687  *
688  * Since: 2.10
689  */
690 void
691 g_async_queue_sort_unlocked (GAsyncQueue      *queue,
692                              GCompareDataFunc  func,
693                              gpointer          user_data)
694 {
695   SortData sd;
696
697   g_return_if_fail (queue != NULL);
698   g_return_if_fail (func != NULL);
699
700   sd.func = func;
701   sd.user_data = user_data;
702
703   g_queue_sort (&queue->queue,
704                 (GCompareDataFunc)g_async_queue_invert_compare,
705                 &sd);
706 }
707
708 /*
709  * Private API
710  */
711
712 GMutex *
713 _g_async_queue_get_mutex (GAsyncQueue *queue)
714 {
715   g_return_val_if_fail (queue, NULL);
716
717   return &queue->mutex;
718 }