2 * Copyright (C) 2009 Edward Hervey <bilboed@bilboed.com>
3 * 2011 Wim Taymans <wim.taymans@gmail.com>
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
23 #include "gst_private.h"
28 #include "gstatomicqueue.h"
29 #include "glib-compat-private.h"
32 * SECTION:gstatomicqueue
33 * @title: GstAtomicQueue
34 * @short_description: An atomic queue implementation
36 * The #GstAtomicQueue object implements a queue that can be used from multiple
37 * threads without performing any blocking operations.
42 /* By default the queue uses 2 * sizeof(gpointer) * clp2 (max_items) of
43 * memory. clp2(x) is the next power of two >= than x.
45 * The queue can operate in low memory mode, in which it consumes almost
46 * half the memory at the expense of extra overhead in the readers. This
47 * is disabled by default because even without LOW_MEM mode, the memory
48 * consumption is still lower than a plain GList.
52 typedef struct _GstAQueueMem GstAQueueMem;
59 volatile gint tail_write;
60 volatile gint tail_read;
77 new_queue_mem (guint size, gint pos)
81 mem = g_new (GstAQueueMem, 1);
83 /* we keep the size as a mask for performance */
84 mem->size = clp2 (MAX (size, 16)) - 1;
85 mem->array = g_new0 (gpointer, mem->size + 1);
87 mem->tail_write = pos;
96 free_queue_mem (GstAQueueMem * mem)
102 struct _GstAtomicQueue
104 volatile gint refcount;
108 GstAQueueMem *head_mem;
109 GstAQueueMem *tail_mem;
110 GstAQueueMem *free_list;
114 add_to_free_list (GstAtomicQueue * queue, GstAQueueMem * mem)
117 mem->free = g_atomic_pointer_get (&queue->free_list);
118 } while (!g_atomic_pointer_compare_and_exchange (&queue->free_list,
123 clear_free_list (GstAtomicQueue * queue)
125 GstAQueueMem *free_list;
127 /* take the free list and replace with NULL */
129 free_list = g_atomic_pointer_get (&queue->free_list);
130 if (free_list == NULL)
132 } while (!g_atomic_pointer_compare_and_exchange (&queue->free_list, free_list,
136 GstAQueueMem *next = free_list->free;
138 free_queue_mem (free_list);
145 * gst_atomic_queue_new:
146 * @initial_size: initial queue size
148 * Create a new atomic queue instance. @initial_size will be rounded up to the
149 * nearest power of 2 and used as the initial size of the queue.
151 * Returns: a new #GstAtomicQueue
156 gst_atomic_queue_new (guint initial_size)
158 GstAtomicQueue *queue;
160 queue = g_new (GstAtomicQueue, 1);
164 queue->num_readers = 0;
166 queue->head_mem = queue->tail_mem = new_queue_mem (initial_size, 0);
167 queue->free_list = NULL;
173 * gst_atomic_queue_ref:
174 * @queue: a #GstAtomicQueue
176 * Increase the refcount of @queue.
181 gst_atomic_queue_ref (GstAtomicQueue * queue)
183 g_return_if_fail (queue != NULL);
185 g_atomic_int_inc (&queue->refcount);
189 gst_atomic_queue_free (GstAtomicQueue * queue)
191 free_queue_mem (queue->head_mem);
192 if (queue->head_mem != queue->tail_mem)
193 free_queue_mem (queue->tail_mem);
194 clear_free_list (queue);
199 * gst_atomic_queue_unref:
200 * @queue: a #GstAtomicQueue
202 * Unref @queue and free the memory when the refcount reaches 0.
207 gst_atomic_queue_unref (GstAtomicQueue * queue)
209 g_return_if_fail (queue != NULL);
211 if (g_atomic_int_dec_and_test (&queue->refcount))
212 gst_atomic_queue_free (queue);
216 * gst_atomic_queue_peek:
217 * @queue: a #GstAtomicQueue
219 * Peek the head element of the queue without removing it from the queue.
221 * Returns: the head element of @queue or NULL when the queue is empty.
226 gst_atomic_queue_peek (GstAtomicQueue * queue)
228 GstAQueueMem *head_mem;
229 gint head, tail, size;
231 g_return_val_if_fail (queue != NULL, NULL);
236 head_mem = g_atomic_pointer_get (&queue->head_mem);
238 head = g_atomic_int_get (&head_mem->head);
239 tail = g_atomic_int_get (&head_mem->tail_read);
240 size = head_mem->size;
242 /* when we are not empty, we can continue */
243 if (G_LIKELY (head != tail))
246 /* else array empty, try to take next */
247 next = g_atomic_pointer_get (&head_mem->next);
251 /* now we try to move the next array as the head memory. If we fail to do that,
252 * some other reader managed to do it first and we retry */
253 if (!g_atomic_pointer_compare_and_exchange (&queue->head_mem, head_mem,
257 /* when we managed to swing the head pointer the old head is now
258 * useless and we add it to the freelist. We can't free the memory yet
259 * because we first need to make sure no reader is accessing it anymore. */
260 add_to_free_list (queue, head_mem);
263 return head_mem->array[head & size];
267 * gst_atomic_queue_pop:
268 * @queue: a #GstAtomicQueue
270 * Get the head element of the queue.
272 * Returns: the head element of @queue or NULL when the queue is empty.
277 gst_atomic_queue_pop (GstAtomicQueue * queue)
280 GstAQueueMem *head_mem;
281 gint head, tail, size;
283 g_return_val_if_fail (queue != NULL, NULL);
286 g_atomic_int_inc (&queue->num_readers);
293 head_mem = g_atomic_pointer_get (&queue->head_mem);
295 head = g_atomic_int_get (&head_mem->head);
296 tail = g_atomic_int_get (&head_mem->tail_read);
297 size = head_mem->size;
299 /* when we are not empty, we can continue */
304 /* else array empty, try to take next */
305 next = g_atomic_pointer_get (&head_mem->next);
309 /* now we try to move the next array as the head memory. If we fail to do that,
310 * some other reader managed to do it first and we retry */
312 (!g_atomic_pointer_compare_and_exchange (&queue->head_mem, head_mem,
316 /* when we managed to swing the head pointer the old head is now
317 * useless and we add it to the freelist. We can't free the memory yet
318 * because we first need to make sure no reader is accessing it anymore. */
319 add_to_free_list (queue, head_mem);
322 ret = head_mem->array[head & size];
324 (!g_atomic_int_compare_and_exchange (&head_mem->head, head, head + 1));
327 /* decrement number of readers, when we reach 0 readers we can be sure that
328 * none is accessing the memory in the free list and we can try to clean up */
329 if (g_atomic_int_dec_and_test (&queue->num_readers))
330 clear_free_list (queue);
337 * gst_atomic_queue_push:
338 * @queue: a #GstAtomicQueue
341 * Append @data to the tail of the queue.
346 gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
348 GstAQueueMem *tail_mem;
349 gint head, tail, size;
351 g_return_if_fail (queue != NULL);
357 tail_mem = g_atomic_pointer_get (&queue->tail_mem);
358 head = g_atomic_int_get (&tail_mem->head);
359 tail = g_atomic_int_get (&tail_mem->tail_write);
360 size = tail_mem->size;
362 /* we're not full, continue */
364 (tail - head <= size)
367 /* else we need to grow the array, we store a mask so we have to add 1 */
368 mem = new_queue_mem ((size << 1) + 1, tail);
370 /* try to make our new array visible to other writers */
372 (!g_atomic_pointer_compare_and_exchange (&queue->tail_mem, tail_mem,
374 /* we tried to swap the new writer array but something changed. This is
375 * because some other writer beat us to it, we free our memory and try
377 free_queue_mem (mem);
380 /* make sure that readers can find our new array as well. The one who
381 * manages to swap the pointer is the only one who can set the next
382 * pointer to the new array */
383 g_atomic_pointer_set (&tail_mem->next, mem);
386 (!g_atomic_int_compare_and_exchange (&tail_mem->tail_write, tail, tail + 1));
388 tail_mem->array[tail & size] = data;
390 /* now wait until all writers have completed their write before we move the
391 * tail_read to this new item. It is possible that other writers are still
392 * updating the previous array slots and we don't want to reveal their changes
393 * before they are done. FIXME, it would be nice if we didn't have to busy
396 (!g_atomic_int_compare_and_exchange (&tail_mem->tail_read, tail, tail + 1));
400 * gst_atomic_queue_length:
401 * @queue: a #GstAtomicQueue
403 * Get the amount of items in the queue.
405 * Returns: the number of elements in the queue.
410 gst_atomic_queue_length (GstAtomicQueue * queue)
412 GstAQueueMem *head_mem, *tail_mem;
415 g_return_val_if_fail (queue != NULL, 0);
418 g_atomic_int_inc (&queue->num_readers);
421 head_mem = g_atomic_pointer_get (&queue->head_mem);
422 head = g_atomic_int_get (&head_mem->head);
424 tail_mem = g_atomic_pointer_get (&queue->tail_mem);
425 tail = g_atomic_int_get (&tail_mem->tail_read);
428 if (g_atomic_int_dec_and_test (&queue->num_readers))
429 clear_free_list (queue);