1 /* GLIB - Library of useful routines for C programming
2 * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
4 * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
5 * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
29 #include "gasyncqueue.h"
33 #include "gtestutils.h"
44 GDestroyNotify item_free_func;
45 guint waiting_threads;
50 GCompareDataFunc func;
57 * Creates a new asynchronous queue with the initial reference count of 1.
59 * Return value: the new #GAsyncQueue.
62 g_async_queue_new (void)
64 GAsyncQueue* retval = g_new (GAsyncQueue, 1);
65 retval->mutex = g_mutex_new ();
67 retval->queue = g_queue_new ();
68 retval->waiting_threads = 0;
69 retval->ref_count = 1;
70 retval->item_free_func = NULL;
75 * g_async_queue_new_full:
76 * @item_free_func: function to free queue elements
78 * Creates a new asynchronous queue with an initial reference count of 1 and
79 * sets up a destroy notify function that is used to free any remaining
80 * queue items when the queue is destroyed after the final unref.
82 * Return value: the new #GAsyncQueue.
87 g_async_queue_new_full (GDestroyNotify item_free_func)
89 GAsyncQueue *async_queue = g_async_queue_new ();
90 async_queue->item_free_func = item_free_func;
96 * @queue: a #GAsyncQueue.
98 * Increases the reference count of the asynchronous @queue by 1. You
99 * do not need to hold the lock to call this function.
101 * Returns: the @queue that was passed in (since 2.6)
104 g_async_queue_ref (GAsyncQueue *queue)
106 g_return_val_if_fail (queue, NULL);
107 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
109 g_atomic_int_inc (&queue->ref_count);
115 * g_async_queue_ref_unlocked:
116 * @queue: a #GAsyncQueue.
118 * Increases the reference count of the asynchronous @queue by 1.
120 * @Deprecated: Since 2.8, reference counting is done atomically
121 * so g_async_queue_ref() can be used regardless of the @queue's
125 g_async_queue_ref_unlocked (GAsyncQueue *queue)
127 g_return_if_fail (queue);
128 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
130 g_atomic_int_inc (&queue->ref_count);
134 * g_async_queue_unref_and_unlock:
135 * @queue: a #GAsyncQueue.
137 * Decreases the reference count of the asynchronous @queue by 1 and
138 * releases the lock. This function must be called while holding the
139 * @queue's lock. If the reference count went to 0, the @queue will be
140 * destroyed and the memory allocated will be freed.
142 * @Deprecated: Since 2.8, reference counting is done atomically
143 * so g_async_queue_unref() can be used regardless of the @queue's
147 g_async_queue_unref_and_unlock (GAsyncQueue *queue)
149 g_return_if_fail (queue);
150 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
152 g_mutex_unlock (queue->mutex);
153 g_async_queue_unref (queue);
157 * g_async_queue_unref:
158 * @queue: a #GAsyncQueue.
160 * Decreases the reference count of the asynchronous @queue by 1. If
161 * the reference count went to 0, the @queue will be destroyed and the
162 * memory allocated will be freed. So you are not allowed to use the
163 * @queue afterwards, as it might have disappeared. You do not need to
164 * hold the lock to call this function.
167 g_async_queue_unref (GAsyncQueue *queue)
169 g_return_if_fail (queue);
170 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
172 if (g_atomic_int_dec_and_test (&queue->ref_count))
174 g_return_if_fail (queue->waiting_threads == 0);
175 g_mutex_free (queue->mutex);
177 g_cond_free (queue->cond);
178 if (queue->item_free_func)
179 g_queue_foreach (queue->queue, (GFunc) queue->item_free_func, NULL);
180 g_queue_free (queue->queue);
186 * g_async_queue_lock:
187 * @queue: a #GAsyncQueue.
189 * Acquires the @queue's lock. After that you can only call the
190 * <function>g_async_queue_*_unlocked()</function> function variants on that
191 * @queue. Otherwise it will deadlock.
194 g_async_queue_lock (GAsyncQueue *queue)
196 g_return_if_fail (queue);
197 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
199 g_mutex_lock (queue->mutex);
203 * g_async_queue_unlock:
204 * @queue: a #GAsyncQueue.
206 * Releases the queue's lock.
209 g_async_queue_unlock (GAsyncQueue *queue)
211 g_return_if_fail (queue);
212 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
214 g_mutex_unlock (queue->mutex);
218 * g_async_queue_push:
219 * @queue: a #GAsyncQueue.
220 * @data: @data to push into the @queue.
222 * Pushes the @data into the @queue. @data must not be %NULL.
225 g_async_queue_push (GAsyncQueue* queue, gpointer data)
227 g_return_if_fail (queue);
228 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
229 g_return_if_fail (data);
231 g_mutex_lock (queue->mutex);
232 g_async_queue_push_unlocked (queue, data);
233 g_mutex_unlock (queue->mutex);
237 * g_async_queue_push_unlocked:
238 * @queue: a #GAsyncQueue.
239 * @data: @data to push into the @queue.
241 * Pushes the @data into the @queue. @data must not be %NULL. This
242 * function must be called while holding the @queue's lock.
245 g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
247 g_return_if_fail (queue);
248 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
249 g_return_if_fail (data);
251 g_queue_push_head (queue->queue, data);
252 if (queue->waiting_threads > 0)
253 g_cond_signal (queue->cond);
257 * g_async_queue_push_sorted:
258 * @queue: a #GAsyncQueue
259 * @data: the @data to push into the @queue
260 * @func: the #GCompareDataFunc is used to sort @queue. This function
261 * is passed two elements of the @queue. The function should return
262 * 0 if they are equal, a negative value if the first element
263 * should be higher in the @queue or a positive value if the first
264 * element should be lower in the @queue than the second element.
265 * @user_data: user data passed to @func.
267 * Inserts @data into @queue using @func to determine the new
270 * This function requires that the @queue is sorted before pushing on
273 * This function will lock @queue before it sorts the queue and unlock
274 * it when it is finished.
276 * For an example of @func see g_async_queue_sort().
281 g_async_queue_push_sorted (GAsyncQueue *queue,
283 GCompareDataFunc func,
286 g_return_if_fail (queue != NULL);
288 g_mutex_lock (queue->mutex);
289 g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
290 g_mutex_unlock (queue->mutex);
294 g_async_queue_invert_compare (gpointer v1,
298 return -sd->func (v1, v2, sd->user_data);
302 * g_async_queue_push_sorted_unlocked:
303 * @queue: a #GAsyncQueue
304 * @data: the @data to push into the @queue
305 * @func: the #GCompareDataFunc is used to sort @queue. This function
306 * is passed two elements of the @queue. The function should return
307 * 0 if they are equal, a negative value if the first element
308 * should be higher in the @queue or a positive value if the first
309 * element should be lower in the @queue than the second element.
310 * @user_data: user data passed to @func.
312 * Inserts @data into @queue using @func to determine the new
315 * This function requires that the @queue is sorted before pushing on
318 * This function is called while holding the @queue's lock.
320 * For an example of @func see g_async_queue_sort().
325 g_async_queue_push_sorted_unlocked (GAsyncQueue *queue,
327 GCompareDataFunc func,
332 g_return_if_fail (queue != NULL);
335 sd.user_data = user_data;
337 g_queue_insert_sorted (queue->queue,
339 (GCompareDataFunc)g_async_queue_invert_compare,
341 if (queue->waiting_threads > 0)
342 g_cond_signal (queue->cond);
346 g_async_queue_pop_intern_unlocked (GAsyncQueue *queue,
352 if (!g_queue_peek_tail_link (queue->queue))
358 queue->cond = g_cond_new ();
362 queue->waiting_threads++;
363 while (!g_queue_peek_tail_link (queue->queue))
364 g_cond_wait (queue->cond, queue->mutex);
365 queue->waiting_threads--;
369 queue->waiting_threads++;
370 while (!g_queue_peek_tail_link (queue->queue))
371 if (!g_cond_timed_wait (queue->cond, queue->mutex, end_time))
373 queue->waiting_threads--;
374 if (!g_queue_peek_tail_link (queue->queue))
379 retval = g_queue_pop_tail (queue->queue);
388 * @queue: a #GAsyncQueue.
390 * Pops data from the @queue. This function blocks until data become
393 * Return value: data from the queue.
396 g_async_queue_pop (GAsyncQueue* queue)
400 g_return_val_if_fail (queue, NULL);
401 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
403 g_mutex_lock (queue->mutex);
404 retval = g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
405 g_mutex_unlock (queue->mutex);
411 * g_async_queue_pop_unlocked:
412 * @queue: a #GAsyncQueue.
414 * Pops data from the @queue. This function blocks until data become
415 * available. This function must be called while holding the @queue's
418 * Return value: data from the queue.
421 g_async_queue_pop_unlocked (GAsyncQueue* queue)
423 g_return_val_if_fail (queue, NULL);
424 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
426 return g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
430 * g_async_queue_try_pop:
431 * @queue: a #GAsyncQueue.
433 * Tries to pop data from the @queue. If no data is available, %NULL is
436 * Return value: data from the queue or %NULL, when no data is
437 * available immediately.
440 g_async_queue_try_pop (GAsyncQueue* queue)
444 g_return_val_if_fail (queue, NULL);
445 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
447 g_mutex_lock (queue->mutex);
448 retval = g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
449 g_mutex_unlock (queue->mutex);
455 * g_async_queue_try_pop_unlocked:
456 * @queue: a #GAsyncQueue.
458 * Tries to pop data from the @queue. If no data is available, %NULL is
459 * returned. This function must be called while holding the @queue's
462 * Return value: data from the queue or %NULL, when no data is
463 * available immediately.
466 g_async_queue_try_pop_unlocked (GAsyncQueue* queue)
468 g_return_val_if_fail (queue, NULL);
469 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
471 return g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
475 * g_async_queue_timed_pop:
476 * @queue: a #GAsyncQueue.
477 * @end_time: a #GTimeVal, determining the final time.
479 * Pops data from the @queue. If no data is received before @end_time,
482 * To easily calculate @end_time a combination of g_get_current_time()
483 * and g_time_val_add() can be used.
485 * Return value: data from the queue or %NULL, when no data is
486 * received before @end_time.
489 g_async_queue_timed_pop (GAsyncQueue* queue, GTimeVal *end_time)
493 g_return_val_if_fail (queue, NULL);
494 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
496 g_mutex_lock (queue->mutex);
497 retval = g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
498 g_mutex_unlock (queue->mutex);
504 * g_async_queue_timed_pop_unlocked:
505 * @queue: a #GAsyncQueue.
506 * @end_time: a #GTimeVal, determining the final time.
508 * Pops data from the @queue. If no data is received before @end_time,
509 * %NULL is returned. This function must be called while holding the
512 * To easily calculate @end_time a combination of g_get_current_time()
513 * and g_time_val_add() can be used.
515 * Return value: data from the queue or %NULL, when no data is
516 * received before @end_time.
519 g_async_queue_timed_pop_unlocked (GAsyncQueue* queue, GTimeVal *end_time)
521 g_return_val_if_fail (queue, NULL);
522 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
524 return g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
528 * g_async_queue_length:
529 * @queue: a #GAsyncQueue.
531 * Returns the length of the queue, negative values mean waiting
532 * threads, positive values mean available entries in the
533 * @queue. Actually this function returns the number of data items in
534 * the queue minus the number of waiting threads. Thus a return value
535 * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
536 * That can happen due to locking of the queue or due to
539 * Return value: the length of the @queue.
542 g_async_queue_length (GAsyncQueue* queue)
546 g_return_val_if_fail (queue, 0);
547 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
549 g_mutex_lock (queue->mutex);
550 retval = queue->queue->length - queue->waiting_threads;
551 g_mutex_unlock (queue->mutex);
557 * g_async_queue_length_unlocked:
558 * @queue: a #GAsyncQueue.
560 * Returns the length of the queue, negative values mean waiting
561 * threads, positive values mean available entries in the
562 * @queue. Actually this function returns the number of data items in
563 * the queue minus the number of waiting threads. Thus a return value
564 * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
565 * That can happen due to locking of the queue or due to
566 * scheduling. This function must be called while holding the @queue's
569 * Return value: the length of the @queue.
572 g_async_queue_length_unlocked (GAsyncQueue* queue)
574 g_return_val_if_fail (queue, 0);
575 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
577 return queue->queue->length - queue->waiting_threads;
581 * g_async_queue_sort:
582 * @queue: a #GAsyncQueue
583 * @func: the #GCompareDataFunc is used to sort @queue. This
584 * function is passed two elements of the @queue. The function
585 * should return 0 if they are equal, a negative value if the
586 * first element should be higher in the @queue or a positive
587 * value if the first element should be lower in the @queue than
588 * the second element.
589 * @user_data: user data passed to @func
591 * Sorts @queue using @func.
593 * This function will lock @queue before it sorts the queue and unlock
594 * it when it is finished.
596 * If you were sorting a list of priority numbers to make sure the
597 * lowest priority would be at the top of the queue, you could use:
602 * id1 = GPOINTER_TO_INT (element1);
603 * id2 = GPOINTER_TO_INT (element2);
605 * return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
611 g_async_queue_sort (GAsyncQueue *queue,
612 GCompareDataFunc func,
615 g_return_if_fail (queue != NULL);
616 g_return_if_fail (func != NULL);
618 g_mutex_lock (queue->mutex);
619 g_async_queue_sort_unlocked (queue, func, user_data);
620 g_mutex_unlock (queue->mutex);
624 * g_async_queue_sort_unlocked:
625 * @queue: a #GAsyncQueue
626 * @func: the #GCompareDataFunc is used to sort @queue. This
627 * function is passed two elements of the @queue. The function
628 * should return 0 if they are equal, a negative value if the
629 * first element should be higher in the @queue or a positive
630 * value if the first element should be lower in the @queue than
631 * the second element.
632 * @user_data: user data passed to @func
634 * Sorts @queue using @func.
636 * This function is called while holding the @queue's lock.
641 g_async_queue_sort_unlocked (GAsyncQueue *queue,
642 GCompareDataFunc func,
647 g_return_if_fail (queue != NULL);
648 g_return_if_fail (func != NULL);
651 sd.user_data = user_data;
653 g_queue_sort (queue->queue,
654 (GCompareDataFunc)g_async_queue_invert_compare,
663 _g_async_queue_get_mutex (GAsyncQueue* queue)
665 g_return_val_if_fail (queue, NULL);
666 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
671 #define __G_ASYNCQUEUE_C__
672 #include "galiasdef.c"