2 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 02111-1307, USA.
23 * SECTION:gstdataqueue
24 * @short_description: Threadsafe queueing object
26 * #GstDataQueue is an object that handles threadsafe queueing of objects. It
27 * also provides size-related functionality. This object should be used for
28 * any #GstElement that wishes to provide some sort of queueing functionality.
33 #include "gstdataqueue.h"
35 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
36 #define GST_CAT_DEFAULT (data_queue_debug)
37 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
40 /* Queue signals and args */
51 ARG_CUR_LEVEL_VISIBLE,
57 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
58 GST_CAT_LOG (data_queue_dataflow, \
59 "locking qlock from thread %p", \
61 g_mutex_lock (q->qlock); \
62 GST_CAT_LOG (data_queue_dataflow, \
63 "locked qlock from thread %p", \
67 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \
68 GST_DATA_QUEUE_MUTEX_LOCK (q); \
73 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
74 GST_CAT_LOG (data_queue_dataflow, \
75 "unlocking qlock from thread %p", \
77 g_mutex_unlock (q->qlock); \
80 #define STATUS(q, msg) \
81 GST_CAT_LOG (data_queue_dataflow, \
82 "queue:%p " msg ": %u visible items, %u " \
83 "bytes, %"G_GUINT64_FORMAT \
86 q->cur_level.visible, \
91 static void gst_data_queue_finalize (GObject * object);
93 static void gst_data_queue_set_property (GObject * object,
94 guint prop_id, const GValue * value, GParamSpec * pspec);
95 static void gst_data_queue_get_property (GObject * object,
96 guint prop_id, GValue * value, GParamSpec * pspec);
98 static GObjectClass *parent_class = NULL;
99 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
103 GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \
104 "data queue object"); \
105 GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \
106 "dataflow inside the data queue object"); \
110 G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT, _do_init);
113 gst_data_queue_class_init (GstDataQueueClass * klass)
115 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
117 parent_class = g_type_class_peek_parent (klass);
119 gobject_class->set_property = gst_data_queue_set_property;
120 gobject_class->get_property = gst_data_queue_get_property;
124 * GstDataQueue::empty:
125 * @queue: the queue instance
127 * Reports that the queue became empty (empty).
128 * A queue is empty if the total amount of visible items inside it (num-visible, time,
129 * size) is lower than the boundary values which can be set through the GObject
132 gst_data_queue_signals[SIGNAL_EMPTY] =
133 g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
134 G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
135 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
138 * GstDataQueue::full:
139 * @queue: the queue instance
141 * Reports that the queue became full (full).
142 * A queue is full if the total amount of data inside it (num-visible, time,
143 * size) is higher than the boundary values which can be set through the GObject
146 gst_data_queue_signals[SIGNAL_FULL] =
147 g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
148 G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
149 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
152 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
153 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
154 "Current amount of data in the queue (bytes)",
155 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
156 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE,
157 g_param_spec_uint ("current-level-visible",
158 "Current level (visible items)",
159 "Current number of visible items in the queue", 0, G_MAXUINT, 0,
160 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
161 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
162 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
163 "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
164 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
166 gobject_class->finalize = gst_data_queue_finalize;
170 gst_data_queue_init (GstDataQueue * queue)
172 queue->cur_level.visible = 0; /* no content */
173 queue->cur_level.bytes = 0; /* no content */
174 queue->cur_level.time = 0; /* no content */
176 queue->checkfull = NULL;
178 queue->qlock = g_mutex_new ();
179 queue->item_add = g_cond_new ();
180 queue->item_del = g_cond_new ();
181 queue->queue = g_queue_new ();
183 GST_DEBUG ("initialized queue's not_empty & not_full conditions");
187 * gst_data_queue_new_full:
188 * @checkfull: the callback used to tell if the element considers the queue full
190 * @fullcallback: the callback which will be called when the queue is considered full.
191 * @emptycallback: the callback which will be called when the queue is considered empty.
192 * @checkdata: a #gpointer that will be given in the @checkfull callback.
194 * Creates a new #GstDataQueue. The difference with @gst_data_queue_new is that it will
195 * not emit the 'full' and 'empty' signals, but instead calling directly @fullcallback
198 * Returns: a new #GstDataQueue.
204 gst_data_queue_new_full (GstDataQueueCheckFullFunction checkfull,
205 GstDataQueueFullCallback fullcallback,
206 GstDataQueueEmptyCallback emptycallback, gpointer checkdata)
210 g_return_val_if_fail (checkfull != NULL, NULL);
212 ret = g_object_newv (GST_TYPE_DATA_QUEUE, 0, NULL);
213 ret->checkfull = checkfull;
214 ret->checkdata = checkdata;
215 ret->fullcallback = fullcallback;
216 ret->emptycallback = emptycallback;
222 * gst_data_queue_new:
223 * @checkfull: the callback used to tell if the element considers the queue full
225 * @checkdata: a #gpointer that will be given in the @checkfull callback.
227 * Returns: a new #GstDataQueue.
231 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata)
233 return gst_data_queue_new_full (checkfull, NULL, NULL, checkdata);
237 gst_data_queue_cleanup (GstDataQueue * queue)
239 while (!g_queue_is_empty (queue->queue)) {
240 GstDataQueueItem *item = g_queue_pop_head (queue->queue);
242 /* Just call the destroy notify on the item */
243 item->destroy (item);
245 queue->cur_level.visible = 0;
246 queue->cur_level.bytes = 0;
247 queue->cur_level.time = 0;
250 /* called only once, as opposed to dispose */
252 gst_data_queue_finalize (GObject * object)
254 GstDataQueue *queue = GST_DATA_QUEUE (object);
256 GST_DEBUG ("finalizing queue");
258 gst_data_queue_cleanup (queue);
259 g_queue_free (queue->queue);
261 GST_DEBUG ("free mutex");
262 g_mutex_free (queue->qlock);
263 GST_DEBUG ("done free mutex");
265 g_cond_free (queue->item_add);
266 g_cond_free (queue->item_del);
268 G_OBJECT_CLASS (parent_class)->finalize (object);
272 gst_data_queue_locked_flush (GstDataQueue * queue)
274 STATUS (queue, "before flushing");
275 gst_data_queue_cleanup (queue);
276 STATUS (queue, "after flushing");
277 /* we deleted something... */
278 g_cond_signal (queue->item_del);
281 static inline gboolean
282 gst_data_queue_locked_is_empty (GstDataQueue * queue)
284 return (queue->queue->length == 0);
287 static inline gboolean
288 gst_data_queue_locked_is_full (GstDataQueue * queue)
290 return queue->checkfull (queue, queue->cur_level.visible,
291 queue->cur_level.bytes, queue->cur_level.time, queue->checkdata);
295 * gst_data_queue_flush:
296 * @queue: a #GstDataQueue.
298 * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
299 * #gst_data_queue_pop will be released.
303 gst_data_queue_flush (GstDataQueue * queue)
305 GST_DEBUG ("queue:%p", queue);
306 GST_DATA_QUEUE_MUTEX_LOCK (queue);
307 gst_data_queue_locked_flush (queue);
308 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
312 * gst_data_queue_is_empty:
313 * @queue: a #GstDataQueue.
315 * Queries if there are any items in the @queue.
318 * Returns: #TRUE if @queue is empty.
321 gst_data_queue_is_empty (GstDataQueue * queue)
325 GST_DATA_QUEUE_MUTEX_LOCK (queue);
326 res = gst_data_queue_locked_is_empty (queue);
327 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
333 * gst_data_queue_is_full:
334 * @queue: a #GstDataQueue.
336 * Queries if @queue is full. This check will be done using the
337 * #GstDataQueueCheckFullFunction registered with @queue.
340 * Returns: #TRUE if @queue is full.
343 gst_data_queue_is_full (GstDataQueue * queue)
347 GST_DATA_QUEUE_MUTEX_LOCK (queue);
348 res = gst_data_queue_locked_is_full (queue);
349 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
355 * gst_data_queue_set_flushing:
356 * @queue: a #GstDataQueue.
357 * @flushing: a #gboolean stating if the queue will be flushing or not.
359 * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
360 * state, any incoming data on the @queue will be discarded. Any call currently
361 * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
362 * away with a return value of #FALSE. While the @queue is in flushing state,
363 * all calls to those two functions will return #FALSE.
368 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
370 GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
372 GST_DATA_QUEUE_MUTEX_LOCK (queue);
373 queue->flushing = flushing;
375 /* release push/pop functions */
376 g_cond_signal (queue->item_add);
377 g_cond_signal (queue->item_del);
379 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
383 * gst_data_queue_push:
384 * @queue: a #GstDataQueue.
385 * @item: a #GstDataQueueItem.
387 * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
388 * on the @queue. If the @queue is full, the call will block until space is
389 * available, OR the @queue is set to flushing state.
392 * Note that this function has slightly different semantics than gst_pad_push()
393 * and gst_pad_push_event(): this function only takes ownership of @item and
394 * the #GstMiniObject contained in @item if the push was successful. If FALSE
395 * is returned, the caller is responsible for freeing @item and its contents.
397 * Returns: #TRUE if the @item was successfully pushed on the @queue.
400 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
402 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
403 g_return_val_if_fail (item != NULL, FALSE);
405 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
407 STATUS (queue, "before pushing");
409 /* We ALWAYS need to check for queue fillness */
410 if (gst_data_queue_locked_is_full (queue)) {
411 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
412 if (G_LIKELY (queue->fullcallback))
413 queue->fullcallback (queue, queue->checkdata);
415 g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0);
416 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
418 /* signal might have removed some items */
419 while (gst_data_queue_locked_is_full (queue)) {
420 g_cond_wait (queue->item_del, queue->qlock);
426 g_queue_push_tail (queue->queue, item);
429 queue->cur_level.visible++;
430 queue->cur_level.bytes += item->size;
431 queue->cur_level.time += item->duration;
433 STATUS (queue, "after pushing");
434 g_cond_signal (queue->item_add);
436 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
443 GST_DEBUG ("queue:%p, we are flushing", queue);
444 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
450 * gst_data_queue_pop:
451 * @queue: a #GstDataQueue.
452 * @item: pointer to store the returned #GstDataQueueItem.
454 * Retrieves the first @item available on the @queue. If the queue is currently
455 * empty, the call will block until at least one item is available, OR the
456 * @queue is set to the flushing state.
459 * Returns: #TRUE if an @item was successfully retrieved from the @queue.
462 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
464 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
465 g_return_val_if_fail (item != NULL, FALSE);
467 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
469 STATUS (queue, "before popping");
471 if (gst_data_queue_locked_is_empty (queue)) {
472 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
473 if (G_LIKELY (queue->emptycallback))
474 queue->emptycallback (queue, queue->checkdata);
476 g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
477 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
479 while (gst_data_queue_locked_is_empty (queue)) {
480 g_cond_wait (queue->item_add, queue->qlock);
486 /* Get the item from the GQueue */
487 *item = g_queue_pop_head (queue->queue);
489 /* update current level counter */
490 if ((*item)->visible)
491 queue->cur_level.visible--;
492 queue->cur_level.bytes -= (*item)->size;
493 queue->cur_level.time -= (*item)->duration;
495 STATUS (queue, "after popping");
496 g_cond_signal (queue->item_del);
498 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
505 GST_DEBUG ("queue:%p, we are flushing", queue);
506 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
512 * gst_data_queue_drop_head:
513 * @queue: The #GstDataQueue to drop an item from.
514 * @type: The #GType of the item to drop.
516 * Pop and unref the head-most #GstMiniObject with the given #GType.
518 * Returns: TRUE if an element was removed.
521 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
523 gboolean res = FALSE;
525 GstDataQueueItem *leak = NULL;
527 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
529 GST_DEBUG ("queue:%p", queue);
531 GST_DATA_QUEUE_MUTEX_LOCK (queue);
532 for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) {
533 GstDataQueueItem *tmp = (GstDataQueueItem *) item->data;
535 if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) {
544 g_queue_delete_link (queue->queue, item);
547 queue->cur_level.visible--;
548 queue->cur_level.bytes -= leak->size;
549 queue->cur_level.time -= leak->duration;
551 leak->destroy (leak);
556 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
558 GST_DEBUG ("queue:%p , res:%d", queue, res);
564 * gst_data_queue_limits_changed:
565 * @queue: The #GstDataQueue
567 * Inform the queue that the limits for the fullness check have changed and that
568 * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
571 gst_data_queue_limits_changed (GstDataQueue * queue)
573 g_return_if_fail (GST_IS_DATA_QUEUE (queue));
575 GST_DATA_QUEUE_MUTEX_LOCK (queue);
576 GST_DEBUG ("signal del");
577 g_cond_signal (queue->item_del);
578 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
582 * gst_data_queue_get_level:
583 * @queue: The #GstDataQueue
584 * @level: the location to store the result
586 * Get the current level of the queue.
589 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
591 memcpy (level, (&queue->cur_level), sizeof (GstDataQueueSize));
595 gst_data_queue_set_property (GObject * object,
596 guint prop_id, const GValue * value, GParamSpec * pspec)
600 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
606 gst_data_queue_get_property (GObject * object,
607 guint prop_id, GValue * value, GParamSpec * pspec)
609 GstDataQueue *queue = GST_DATA_QUEUE (object);
611 GST_DATA_QUEUE_MUTEX_LOCK (queue);
614 case ARG_CUR_LEVEL_BYTES:
615 g_value_set_uint (value, queue->cur_level.bytes);
617 case ARG_CUR_LEVEL_VISIBLE:
618 g_value_set_uint (value, queue->cur_level.visible);
620 case ARG_CUR_LEVEL_TIME:
621 g_value_set_uint64 (value, queue->cur_level.time);
624 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
628 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);