atomicqueue: add an atomic queue
[platform/upstream/gstreamer.git] / gst / gstatomicqueue.c
1 /* GStreamer
2  * Copyright (C) 2009 Edward Hervey <bilboed@bilboed.com>
3  *               2011 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * gstatomicqueue.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #include <string.h>
24 #include <gst/gst.h>
25 #include "gstatomicqueue.h"
26
27 /* By default the queue uses 2 * sizeof(gpointer) * clp2 (max_items) of
28  * memory. clp2(x) is the next power of two >= than x.
29  *
30  * The queue can operate in low memory mode, in which it consumes almost
31  * half the memory at the expense of extra overhead in the readers. This
32  * is disabled by default because even without LOW_MEM mode, the memory
33  * consumption is still lower than a plain GList.
34  */
35 #undef LOW_MEM
36
37 typedef struct _GstAQueueMem GstAQueueMem;
38
39 struct _GstAQueueMem
40 {
41   gint size;
42   gpointer *array;
43   volatile gint head;
44   volatile gint tail;
45   GstAQueueMem *next;
46   GstAQueueMem *free;
47 };
48
49 static guint
50 clp2 (guint n)
51 {
52   guint res = 1;
53
54   while (res < n)
55     res <<= 1;
56
57   return res;
58 }
59
60 static GstAQueueMem *
61 new_queue_mem (guint size, gint pos)
62 {
63   GstAQueueMem *mem;
64
65   mem = g_new (GstAQueueMem, 1);
66
67   /* we keep the size as a mask for performance */
68   mem->size = clp2 (size) - 1;
69   mem->array = g_new0 (gpointer, mem->size);
70   mem->head = pos;
71   mem->tail = pos;
72   mem->next = NULL;
73   mem->free = NULL;
74
75   return mem;
76 }
77
78 static void
79 free_queue_mem (GstAQueueMem * mem)
80 {
81   g_free (mem->array);
82   g_free (mem);
83 }
84
85 struct _GstAtomicQueue
86 {
87 #ifdef LOW_MEM
88   gint num_readers;
89 #endif
90   GstAQueueMem *head_mem;
91   GstAQueueMem *tail_mem;
92   GstAQueueMem *free_list;
93 };
94
95 static void
96 add_to_free_list (GstAtomicQueue * queue, GstAQueueMem * mem)
97 {
98   do {
99     mem->free = g_atomic_pointer_get (&queue->free_list);
100   } while (!g_atomic_pointer_compare_and_exchange ((gpointer *) &
101           queue->free_list, mem->free, mem));
102 }
103
104 static void
105 clear_free_list (GstAtomicQueue * queue)
106 {
107   GstAQueueMem *free_list;
108
109   /* take the free list and replace with NULL */
110   do {
111     free_list = g_atomic_pointer_get (&queue->free_list);
112     if (free_list == NULL)
113       return;
114   } while (!g_atomic_pointer_compare_and_exchange ((gpointer *) &
115           queue->free_list, free_list, NULL));
116
117   while (free_list) {
118     GstAQueueMem *next = free_list->free;
119
120     free_queue_mem (free_list);
121
122     free_list = next;
123   }
124 }
125
126 GstAtomicQueue *
127 gst_atomic_queue_new (guint initial_size)
128 {
129   GstAtomicQueue *queue;
130
131   queue = g_new (GstAtomicQueue, 1);
132
133 #ifdef LOW_MEM
134   queue->num_readers = 0;
135 #endif
136   queue->head_mem = queue->tail_mem = new_queue_mem (initial_size, 0);
137   queue->free_list = NULL;
138
139   return queue;
140 }
141
142 void
143 gst_atomic_queue_free (GstAtomicQueue * queue)
144 {
145   free_queue_mem (queue->head_mem);
146   if (queue->head_mem != queue->tail_mem)
147     free_queue_mem (queue->tail_mem);
148   clear_free_list (queue);
149   g_free (queue);
150 }
151
152 gpointer
153 gst_atomic_queue_peek (GstAtomicQueue * queue)
154 {
155   GstAQueueMem *head_mem;
156   gint head, tail, size;
157
158   while (TRUE) {
159     GstAQueueMem *next;
160
161     head_mem = g_atomic_pointer_get (&queue->head_mem);
162
163     head = g_atomic_int_get (&head_mem->head);
164     tail = g_atomic_int_get (&head_mem->tail);
165     size = head_mem->size;
166
167     /* when we are not empty, we can continue */
168     if (G_LIKELY (head != tail))
169       break;
170
171     /* else array empty, try to take next */
172     next = g_atomic_pointer_get (&head_mem->next);
173     if (next == NULL)
174       return NULL;
175
176     /* now we try to move the next array as the head memory. If we fail to do that,
177      * some other reader managed to do it first and we retry */
178     if (!g_atomic_pointer_compare_and_exchange ((gpointer *) &
179             queue->head_mem, head_mem, next))
180       continue;
181
182     /* when we managed to swing the head pointer the old head is now
183      * useless and we add it to the freelist. We can't free the memory yet
184      * because we first need to make sure no reader is accessing it anymore. */
185     add_to_free_list (queue, head_mem);
186   }
187
188   return head_mem->array[head & size];
189 }
190
191 gpointer
192 gst_atomic_queue_pop (GstAtomicQueue * queue)
193 {
194   gpointer ret;
195   GstAQueueMem *head_mem;
196   gint head, tail, size;
197
198 #ifdef LOW_MEM
199   g_atomic_int_inc (&queue->num_readers);
200 #endif
201
202   do {
203     while (TRUE) {
204       GstAQueueMem *next;
205
206       head_mem = g_atomic_pointer_get (&queue->head_mem);
207
208       head = g_atomic_int_get (&head_mem->head);
209       tail = g_atomic_int_get (&head_mem->tail);
210       size = head_mem->size;
211
212       /* when we are not empty, we can continue */
213       if (G_LIKELY (head != tail))
214         break;
215
216       /* else array empty, try to take next */
217       next = g_atomic_pointer_get (&head_mem->next);
218       if (next == NULL)
219         return NULL;
220
221       /* now we try to move the next array as the head memory. If we fail to do that,
222        * some other reader managed to do it first and we retry */
223       if (!g_atomic_pointer_compare_and_exchange ((gpointer *) &
224               queue->head_mem, head_mem, next))
225         continue;
226
227       /* when we managed to swing the head pointer the old head is now
228        * useless and we add it to the freelist. We can't free the memory yet
229        * because we first need to make sure no reader is accessing it anymore. */
230       add_to_free_list (queue, head_mem);
231     }
232
233     ret = head_mem->array[head & size];
234   } while (!g_atomic_int_compare_and_exchange (&head_mem->head, head,
235           head + 1));
236
237 #ifdef LOW_MEM
238   /* decrement number of readers, when we reach 0 readers we can be sure that
239    * none is accessing the memory in the free list and we can try to clean up */
240   if (g_atomic_int_dec_and_test (&queue->num_readers))
241     clear_free_list (queue);
242 #endif
243
244   return ret;
245 }
246
247 void
248 gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
249 {
250   GstAQueueMem *tail_mem;
251   gint head, tail, size;
252
253   do {
254     while (TRUE) {
255       GstAQueueMem *mem;
256
257       tail_mem = g_atomic_pointer_get (&queue->tail_mem);
258       head = g_atomic_int_get (&tail_mem->head);
259       tail = g_atomic_int_get (&tail_mem->tail);
260       size = tail_mem->size;
261
262       /* we're not full, continue */
263       if (tail - head < size)
264         break;
265
266       /* else we need to grow the array, we store a mask so we have to add 1 */
267       mem = new_queue_mem ((size << 1) + 1, tail);
268
269       /* try to make our new array visible to other writers */
270       if (!g_atomic_pointer_compare_and_exchange ((gpointer *) &
271               queue->tail_mem, tail_mem, mem)) {
272         /* we tried to swap the new writer array but something changed. This is
273          * because some other writer beat us to it, we free our memory and try
274          * again */
275         free_queue_mem (mem);
276         continue;
277       }
278       /* make sure that readers can find our new array as well. The one who
279        * manages to swap the pointer is the only one who can set the next
280        * pointer to the new array */
281       g_atomic_pointer_set (&tail_mem->next, mem);
282     }
283   } while (!g_atomic_int_compare_and_exchange (&tail_mem->tail, tail,
284           tail + 1));
285
286   tail_mem->array[tail & size] = data;
287 }
288
289 guint
290 gst_atomic_queue_length (GstAtomicQueue * queue)
291 {
292   GstAQueueMem *head_mem, *tail_mem;
293   gint head, tail;
294
295 #ifdef LOW_MEM
296   g_atomic_int_inc (&queue->num_readers);
297 #endif
298
299   head_mem = g_atomic_pointer_get (&queue->head_mem);
300   head = g_atomic_int_get (&head_mem->head);
301
302   tail_mem = g_atomic_pointer_get (&queue->tail_mem);
303   tail = g_atomic_int_get (&tail_mem->tail);
304
305 #ifdef LOW_MEM
306   if (g_atomic_int_dec_and_test (&queue->num_readers))
307     clear_free_list (queue);
308 #endif
309
310   return tail - head;
311 }