GAsyncQueue: internal cleanup
[platform/upstream/glib.git] / glib / gasyncqueue.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: asynchronous queue implementation, based on GQueue.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "gasyncqueue.h"
30 #include "gasyncqueueprivate.h"
31
32 #include "gmem.h"
33 #include "gqueue.h"
34 #include "gtestutils.h"
35 #include "gthread.h"
36
37
38 /**
39  * SECTION:async_queues
40  * @title: Asynchronous Queues
41  * @short_description: asynchronous communication between threads
42  * @see_also: #GThreadPool
43  *
44  * Often you need to communicate between different threads. In general
45  * it's safer not to do this by shared memory, but by explicit message
46  * passing. These messages only make sense asynchronously for
47  * multi-threaded applications though, as a synchronous operation could
48  * as well be done in the same thread.
49  *
50  * Asynchronous queues are an exception from most other GLib data
51  * structures, as they can be used simultaneously from multiple threads
52  * without explicit locking and they bring their own builtin reference
53  * counting. This is because the nature of an asynchronous queue is that
54  * it will always be used by at least 2 concurrent threads.
55  *
56  * For using an asynchronous queue you first have to create one with
57  * g_async_queue_new(). #GAsyncQueue structs are reference counted,
58  * use g_async_queue_ref() and g_async_queue_unref() to manage your
59  * references.
60  *
61  * A thread which wants to send a message to that queue simply calls
62  * g_async_queue_push() to push the message to the queue.
63  *
64  * A thread which is expecting messages from an asynchronous queue
65  * simply calls g_async_queue_pop() for that queue. If no message is
66  * available in the queue at that point, the thread is now put to sleep
67  * until a message arrives. The message will be removed from the queue
68  * and returned. The functions g_async_queue_try_pop() and
69  * g_async_queue_timed_pop() can be used to only check for the presence
70  * of messages or to only wait a certain time for messages respectively.
71  *
72  * For almost every function there exist two variants, one that locks
73  * the queue and one that doesn't. That way you can hold the queue lock
74  * (acquire it with g_async_queue_lock() and release it with
75  * g_async_queue_unlock()) over multiple queue accessing instructions.
76  * This can be necessary to ensure the integrity of the queue, but should
77  * only be used when really necessary, as it can make your life harder
78  * if used unwisely. Normally you should only use the locking function
79  * variants (those without the _unlocked suffix).
80  *
81  * In many cases, it may be more convenient to use #GThreadPool when
82  * you need to distribute work to a set of worker threads instead of
83  * using #GAsyncQueue manually. #GThreadPool uses a GAsyncQueue
84  * internally.
85  */
86
87 /**
88  * GAsyncQueue:
89  *
90  * The GAsyncQueue struct is an opaque data structure which represents
91  * an asynchronous queue. It should only be accessed through the
92  * <function>g_async_queue_*</function> functions.
93  */
94 struct _GAsyncQueue
95 {
96   GMutex mutex;
97   GCond cond;
98   GQueue queue;
99   GDestroyNotify item_free_func;
100   guint waiting_threads;
101   gint ref_count;
102 };
103
104 typedef struct
105 {
106   GCompareDataFunc func;
107   gpointer         user_data;
108 } SortData;
109
110 /**
111  * g_async_queue_new:
112  *
113  * Creates a new asynchronous queue.
114  *
115  * Return value: a new #GAsyncQueue. Free with g_async_queue_unref()
116  */
117 GAsyncQueue *
118 g_async_queue_new (void)
119 {
120   return g_async_queue_new_full (NULL);
121 }
122
123 /**
124  * g_async_queue_new_full:
125  * @item_free_func: function to free queue elements
126  *
127  * Creates a new asynchronous queue and sets up a destroy notify
128  * function that is used to free any remaining queue items when
129  * the queue is destroyed after the final unref.
130  *
131  * Return value: a new #GAsyncQueue. Free with g_async_queue_unref()
132  *
133  * Since: 2.16
134  */
135 GAsyncQueue *
136 g_async_queue_new_full (GDestroyNotify item_free_func)
137 {
138   GAsyncQueue *queue;
139
140   queue = g_new (GAsyncQueue, 1);
141   g_mutex_init (&queue->mutex);
142   g_cond_init (&queue->cond);
143   g_queue_init (&queue->queue);
144   queue->waiting_threads = 0;
145   queue->ref_count = 1;
146   queue->item_free_func = NULL;
147
148   return queue;
149 }
150
151 /**
152  * g_async_queue_ref:
153  * @queue: a #GAsyncQueue
154  *
155  * Increases the reference count of the asynchronous @queue by 1.
156  * You do not need to hold the lock to call this function.
157  *
158  * Returns: the @queue that was passed in (since 2.6)
159  */
160 GAsyncQueue *
161 g_async_queue_ref (GAsyncQueue *queue)
162 {
163   g_return_val_if_fail (queue, NULL);
164
165   g_atomic_int_inc (&queue->ref_count);
166
167   return queue;
168 }
169
170 /**
171  * g_async_queue_ref_unlocked:
172  * @queue: a #GAsyncQueue
173  *
174  * Increases the reference count of the asynchronous @queue by 1.
175  *
176  * @Deprecated: Since 2.8, reference counting is done atomically
177  * so g_async_queue_ref() can be used regardless of the @queue's
178  * lock.
179  */
180 void
181 g_async_queue_ref_unlocked (GAsyncQueue *queue)
182 {
183   g_return_if_fail (queue);
184
185   g_atomic_int_inc (&queue->ref_count);
186 }
187
188 /**
189  * g_async_queue_unref_and_unlock:
190  * @queue: a #GAsyncQueue
191  *
192  * Decreases the reference count of the asynchronous @queue by 1
193  * and releases the lock. This function must be called while holding
194  * the @queue's lock. If the reference count went to 0, the @queue
195  * will be destroyed and the memory allocated will be freed.
196  *
197  * @Deprecated: Since 2.8, reference counting is done atomically
198  * so g_async_queue_unref() can be used regardless of the @queue's
199  * lock.
200  */
201 void
202 g_async_queue_unref_and_unlock (GAsyncQueue *queue)
203 {
204   g_return_if_fail (queue);
205
206   g_mutex_unlock (&queue->mutex);
207   g_async_queue_unref (queue);
208 }
209
210 /**
211  * g_async_queue_unref:
212  * @queue: a #GAsyncQueue.
213  *
214  * Decreases the reference count of the asynchronous @queue by 1.
215  *
216  * If the reference count went to 0, the @queue will be destroyed
217  * and the memory allocated will be freed. So you are not allowed
218  * to use the @queue afterwards, as it might have disappeared.
219  * You do not need to hold the lock to call this function.
220  */
221 void
222 g_async_queue_unref (GAsyncQueue *queue)
223 {
224   g_return_if_fail (queue);
225
226   if (g_atomic_int_dec_and_test (&queue->ref_count))
227     {
228       g_return_if_fail (queue->waiting_threads == 0);
229       g_mutex_clear (&queue->mutex);
230       g_cond_clear (&queue->cond);
231       if (queue->item_free_func)
232         g_queue_foreach (&queue->queue, (GFunc) queue->item_free_func, NULL);
233       g_queue_clear (&queue->queue);
234       g_free (queue);
235     }
236 }
237
238 /**
239  * g_async_queue_lock:
240  * @queue: a #GAsyncQueue
241  *
242  * Acquires the @queue's lock. If another thread is already
243  * holding the lock, this call will block until the lock
244  * becomes available.
245  *
246  * Call g_async_queue_unlock() to drop the lock again.
247  *
248  * While holding the lock, you can only call the
249  * <function>g_async_queue_*_unlocked()</function> functions
250  * on @queue. Otherwise, deadlock may occur.
251  */
252 void
253 g_async_queue_lock (GAsyncQueue *queue)
254 {
255   g_return_if_fail (queue);
256
257   g_mutex_lock (&queue->mutex);
258 }
259
260 /**
261  * g_async_queue_unlock:
262  * @queue: a #GAsyncQueue
263  *
264  * Releases the queue's lock.
265  *
266  * Calling this function when you have not acquired
267  * the with g_async_queue_lock() leads to undefined
268  * behaviour.
269  */
270 void
271 g_async_queue_unlock (GAsyncQueue *queue)
272 {
273   g_return_if_fail (queue);
274
275   g_mutex_unlock (&queue->mutex);
276 }
277
278 /**
279  * g_async_queue_push:
280  * @queue: a #GAsyncQueue
281  * @data: @data to push into the @queue
282  *
283  * Pushes the @data into the @queue. @data must not be %NULL.
284  */
285 void
286 g_async_queue_push (GAsyncQueue *queue,
287                     gpointer     data)
288 {
289   g_return_if_fail (queue);
290   g_return_if_fail (data);
291
292   g_mutex_lock (&queue->mutex);
293   g_async_queue_push_unlocked (queue, data);
294   g_mutex_unlock (&queue->mutex);
295 }
296
297 /**
298  * g_async_queue_push_unlocked:
299  * @queue: a #GAsyncQueue
300  * @data: @data to push into the @queue
301  *
302  * Pushes the @data into the @queue. @data must not be %NULL.
303  *
304  * This function must be called while holding the @queue's lock.
305  */
306 void
307 g_async_queue_push_unlocked (GAsyncQueue *queue,
308                              gpointer     data)
309 {
310   g_return_if_fail (queue);
311   g_return_if_fail (data);
312
313   g_queue_push_head (&queue->queue, data);
314   if (queue->waiting_threads > 0)
315     g_cond_signal (&queue->cond);
316 }
317
318 /**
319  * g_async_queue_push_sorted:
320  * @queue: a #GAsyncQueue
321  * @data: the @data to push into the @queue
322  * @func: the #GCompareDataFunc is used to sort @queue
323  * @user_data: user data passed to @func.
324  *
325  * Inserts @data into @queue using @func to determine the new
326  * position.
327  *
328  * This function requires that the @queue is sorted before pushing on
329  * new elements, see g_async_queue_sort().
330  *
331  * This function will lock @queue before it sorts the queue and unlock
332  * it when it is finished.
333  *
334  * For an example of @func see g_async_queue_sort().
335  *
336  * Since: 2.10
337  */
338 void
339 g_async_queue_push_sorted (GAsyncQueue      *queue,
340                            gpointer          data,
341                            GCompareDataFunc  func,
342                            gpointer          user_data)
343 {
344   g_return_if_fail (queue != NULL);
345
346   g_mutex_lock (&queue->mutex);
347   g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
348   g_mutex_unlock (&queue->mutex);
349 }
350
351 static gint
352 g_async_queue_invert_compare (gpointer  v1,
353                               gpointer  v2,
354                               SortData *sd)
355 {
356   return -sd->func (v1, v2, sd->user_data);
357 }
358
359 /**
360  * g_async_queue_push_sorted_unlocked:
361  * @queue: a #GAsyncQueue
362  * @data: the @data to push into the @queue
363  * @func: the #GCompareDataFunc is used to sort @queue
364  * @user_data: user data passed to @func.
365  *
366  * Inserts @data into @queue using @func to determine the new
367  * position.
368  *
369  * The sort function @func is passed two elements of the @queue.
370  * It should return 0 if they are equal, a negative value if the
371  * first element should be higher in the @queue or a positive value
372  * if the first element should be lower in the @queue than the second
373  * element.
374  *
375  * This function requires that the @queue is sorted before pushing on
376  * new elements, see g_async_queue_sort().
377  *
378  * This function must be called while holding the @queue's lock.
379  *
380  * For an example of @func see g_async_queue_sort().
381  *
382  * Since: 2.10
383  */
384 void
385 g_async_queue_push_sorted_unlocked (GAsyncQueue      *queue,
386                                     gpointer          data,
387                                     GCompareDataFunc  func,
388                                     gpointer          user_data)
389 {
390   SortData sd;
391
392   g_return_if_fail (queue != NULL);
393
394   sd.func = func;
395   sd.user_data = user_data;
396
397   g_queue_insert_sorted (&queue->queue,
398                          data,
399                          (GCompareDataFunc)g_async_queue_invert_compare,
400                          &sd);
401   if (queue->waiting_threads > 0)
402     g_cond_signal (&queue->cond);
403 }
404
405 static gpointer
406 g_async_queue_pop_intern_unlocked (GAsyncQueue *queue,
407                                    gboolean     wait,
408                                    GTimeVal    *end_time)
409 {
410   gpointer retval;
411
412   if (!g_queue_peek_tail_link (&queue->queue))
413     {
414       if (!wait)
415         return NULL;
416
417       if (!end_time)
418         {
419           queue->waiting_threads++;
420           while (!g_queue_peek_tail_link (&queue->queue))
421             g_cond_wait (&queue->cond, &queue->mutex);
422           queue->waiting_threads--;
423         }
424       else
425         {
426           queue->waiting_threads++;
427           while (!g_queue_peek_tail_link (&queue->queue))
428             if (!g_cond_timed_wait (&queue->cond, &queue->mutex, end_time))
429               break;
430           queue->waiting_threads--;
431           if (!g_queue_peek_tail_link (&queue->queue))
432             return NULL;
433         }
434     }
435
436   retval = g_queue_pop_tail (&queue->queue);
437
438   g_assert (retval);
439
440   return retval;
441 }
442
443 /**
444  * g_async_queue_pop:
445  * @queue: a #GAsyncQueue
446  *
447  * Pops data from the @queue. If @queue is empty, this function
448  * blocks until data becomes available.
449  *
450  * Return value: data from the queue
451  */
452 gpointer
453 g_async_queue_pop (GAsyncQueue *queue)
454 {
455   gpointer retval;
456
457   g_return_val_if_fail (queue, NULL);
458
459   g_mutex_lock (&queue->mutex);
460   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
461   g_mutex_unlock (&queue->mutex);
462
463   return retval;
464 }
465
466 /**
467  * g_async_queue_pop_unlocked:
468  * @queue: a #GAsyncQueue
469  *
470  * Pops data from the @queue. If @queue is empty, this function
471  * blocks until data becomes available.
472  *
473  * This function must be called while holding the @queue's lock.
474  *
475  * Return value: data from the queue.
476  */
477 gpointer
478 g_async_queue_pop_unlocked (GAsyncQueue *queue)
479 {
480   g_return_val_if_fail (queue, NULL);
481
482   return g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
483 }
484
485 /**
486  * g_async_queue_try_pop:
487  * @queue: a #GAsyncQueue
488  *
489  * Tries to pop data from the @queue. If no data is available,
490  * %NULL is returned.
491  *
492  * Return value: data from the queue or %NULL, when no data is
493  *     available immediately.
494  */
495 gpointer
496 g_async_queue_try_pop (GAsyncQueue *queue)
497 {
498   gpointer retval;
499
500   g_return_val_if_fail (queue, NULL);
501
502   g_mutex_lock (&queue->mutex);
503   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
504   g_mutex_unlock (&queue->mutex);
505
506   return retval;
507 }
508
509 /**
510  * g_async_queue_try_pop_unlocked:
511  * @queue: a #GAsyncQueue
512  *
513  * Tries to pop data from the @queue. If no data is available,
514  * %NULL is returned.
515  *
516  * This function must be called while holding the @queue's lock.
517  *
518  * Return value: data from the queue or %NULL, when no data is
519  *     available immediately.
520  */
521 gpointer
522 g_async_queue_try_pop_unlocked (GAsyncQueue *queue)
523 {
524   g_return_val_if_fail (queue, NULL);
525
526   return g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
527 }
528
529 /**
530  * g_async_queue_timed_pop:
531  * @queue: a #GAsyncQueue
532  * @end_time: a #GTimeVal, determining the final time
533  *
534  * Pops data from the @queue. If the queue is empty, blocks until
535  * @end_time or until data becomes available.
536  *
537  * If no data is received before @end_time, %NULL is returned.
538  *
539  * To easily calculate @end_time, a combination of g_get_current_time()
540  * and g_time_val_add() can be used.
541  *
542  * Return value: data from the queue or %NULL, when no data is
543  *     received before @end_time.
544  */
545 gpointer
546 g_async_queue_timed_pop (GAsyncQueue *queue,
547                          GTimeVal    *end_time)
548 {
549   gpointer retval;
550
551   g_return_val_if_fail (queue, NULL);
552
553   g_mutex_lock (&queue->mutex);
554   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, end_time);
555   g_mutex_unlock (&queue->mutex);
556
557   return retval;
558 }
559
560 /**
561  * g_async_queue_timed_pop_unlocked:
562  * @queue: a #GAsyncQueue
563  * @end_time: a #GTimeVal, determining the final time
564  *
565  * Pops data from the @queue. If the queue is empty, blocks until
566  * @end_time or until data becomes available.
567  *
568  * If no data is received before @end_time, %NULL is returned.
569  *
570  * To easily calculate @end_time, a combination of g_get_current_time()
571  * and g_time_val_add() can be used.
572  *
573  * This function must be called while holding the @queue's lock.
574  *
575  * Return value: data from the queue or %NULL, when no data is
576  *     received before @end_time.
577  */
578 gpointer
579 g_async_queue_timed_pop_unlocked (GAsyncQueue *queue,
580                                   GTimeVal    *end_time)
581 {
582   g_return_val_if_fail (queue, NULL);
583
584   return g_async_queue_pop_intern_unlocked (queue, TRUE, end_time);
585 }
586
587 /**
588  * g_async_queue_length:
589  * @queue: a #GAsyncQueue.
590  *
591  * Returns the length of the queue.
592  *
593  * Actually this function returns the number of data items in
594  * the queue minus the number of waiting threads, so a negative
595  * value means waiting threads, and a positive value means available
596  * entries in the @queue. A return value of 0 could mean n entries
597  * in the queue and n threads waiting. This can happen due to locking
598  * of the queue or due to scheduling.
599  *
600  * Return value: the length of the @queue
601  */
602 gint
603 g_async_queue_length (GAsyncQueue *queue)
604 {
605   gint retval;
606
607   g_return_val_if_fail (queue, 0);
608
609   g_mutex_lock (&queue->mutex);
610   retval = queue->queue.length - queue->waiting_threads;
611   g_mutex_unlock (&queue->mutex);
612
613   return retval;
614 }
615
616 /**
617  * g_async_queue_length_unlocked:
618  * @queue: a #GAsyncQueue
619  *
620  * Returns the length of the queue.
621  *
622  * Actually this function returns the number of data items in
623  * the queue minus the number of waiting threads, so a negative
624  * value means waiting threads, and a positive value means available
625  * entries in the @queue. A return value of 0 could mean n entries
626  * in the queue and n threads waiting. This can happen due to locking
627  * of the queue or due to scheduling.
628  *
629  * This function must be called while holding the @queue's lock.
630  *
631  * Return value: the length of the @queue.
632  */
633 gint
634 g_async_queue_length_unlocked (GAsyncQueue *queue)
635 {
636   g_return_val_if_fail (queue, 0);
637
638   return queue->queue.length - queue->waiting_threads;
639 }
640
641 /**
642  * g_async_queue_sort:
643  * @queue: a #GAsyncQueue
644  * @func: the #GCompareDataFunc is used to sort @queue
645  * @user_data: user data passed to @func
646  *
647  * Sorts @queue using @func.
648  *
649  * The sort function @func is passed two elements of the @queue.
650  * It should return 0 if they are equal, a negative value if the
651  * first element should be higher in the @queue or a positive value
652  * if the first element should be lower in the @queue than the second
653  * element.
654  *
655  * This function will lock @queue before it sorts the queue and unlock
656  * it when it is finished.
657  *
658  * If you were sorting a list of priority numbers to make sure the
659  * lowest priority would be at the top of the queue, you could use:
660  * |[
661  *  gint32 id1;
662  *  gint32 id2;
663  *
664  *  id1 = GPOINTER_TO_INT (element1);
665  *  id2 = GPOINTER_TO_INT (element2);
666  *
667  *  return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
668  * ]|
669  *
670  * Since: 2.10
671  */
672 void
673 g_async_queue_sort (GAsyncQueue      *queue,
674                     GCompareDataFunc  func,
675                     gpointer          user_data)
676 {
677   g_return_if_fail (queue != NULL);
678   g_return_if_fail (func != NULL);
679
680   g_mutex_lock (&queue->mutex);
681   g_async_queue_sort_unlocked (queue, func, user_data);
682   g_mutex_unlock (&queue->mutex);
683 }
684
685 /**
686  * g_async_queue_sort_unlocked:
687  * @queue: a #GAsyncQueue
688  * @func: the #GCompareDataFunc is used to sort @queue
689  * @user_data: user data passed to @func
690  *
691  * Sorts @queue using @func.
692  *
693  * The sort function @func is passed two elements of the @queue.
694  * It should return 0 if they are equal, a negative value if the
695  * first element should be higher in the @queue or a positive value
696  * if the first element should be lower in the @queue than the second
697  * element.
698  *
699  * This function must be called while holding the @queue's lock.
700  *
701  * Since: 2.10
702  */
703 void
704 g_async_queue_sort_unlocked (GAsyncQueue      *queue,
705                              GCompareDataFunc  func,
706                              gpointer          user_data)
707 {
708   SortData sd;
709
710   g_return_if_fail (queue != NULL);
711   g_return_if_fail (func != NULL);
712
713   sd.func = func;
714   sd.user_data = user_data;
715
716   g_queue_sort (&queue->queue,
717                 (GCompareDataFunc)g_async_queue_invert_compare,
718                 &sd);
719 }
720
721 /*
722  * Private API
723  */
724
725 GMutex *
726 _g_async_queue_get_mutex (GAsyncQueue *queue)
727 {
728   g_return_val_if_fail (queue, NULL);
729
730   return &queue->mutex;
731 }