dataqueue: reduce debug log spam a bit
[platform/upstream/gstreamer.git] / libs / gst / base / gstdataqueue.c
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  *
4  * gstdataqueue.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gstdataqueue
24  * @short_description: Threadsafe queueing object
25  *
26  * #GstDataQueue is an object that handles threadsafe queueing of objects. It
27  * also provides size-related functionality. This object should be used for
28  * any #GstElement that wishes to provide some sort of queueing functionality.
29  */
30
31 #include <gst/gst.h>
32 #include "string.h"
33 #include "gstdataqueue.h"
34 #include "gstqueuearray.h"
35 #include "gst/glib-compat-private.h"
36
37 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
38 #define GST_CAT_DEFAULT (data_queue_debug)
39 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
40
41
42 /* Queue signals and args */
43 enum
44 {
45   SIGNAL_EMPTY,
46   SIGNAL_FULL,
47   LAST_SIGNAL
48 };
49
50 enum
51 {
52   PROP_0,
53   PROP_CUR_LEVEL_VISIBLE,
54   PROP_CUR_LEVEL_BYTES,
55   PROP_CUR_LEVEL_TIME
56       /* FILL ME */
57 };
58
59 struct _GstDataQueuePrivate
60 {
61   /* the array of data we're keeping our grubby hands on */
62   GstQueueArray *queue;
63
64   GstDataQueueSize cur_level;   /* size of the queue */
65   GstDataQueueCheckFullFunction checkfull;      /* Callback to check if the queue is full */
66   gpointer *checkdata;
67
68   GMutex qlock;                 /* lock for queue (vs object lock) */
69   gboolean waiting_add;
70   GCond item_add;               /* signals buffers now available for reading */
71   gboolean waiting_del;
72   GCond item_del;               /* signals space now available for writing */
73   gboolean flushing;            /* indicates whether conditions where signalled because
74                                  * of external flushing */
75   GstDataQueueFullCallback fullcallback;
76   GstDataQueueEmptyCallback emptycallback;
77 };
78
79 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
80     GST_CAT_TRACE (data_queue_dataflow,                                 \
81       "locking qlock from thread %p",                                   \
82       g_thread_self ());                                                \
83   g_mutex_lock (&q->priv->qlock);                                       \
84   GST_CAT_TRACE (data_queue_dataflow,                                   \
85       "locked qlock from thread %p",                                    \
86       g_thread_self ());                                                \
87 } G_STMT_END
88
89 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START {        \
90     GST_DATA_QUEUE_MUTEX_LOCK (q);                                      \
91     if (q->priv->flushing)                                              \
92       goto label;                                                       \
93   } G_STMT_END
94
95 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
96     GST_CAT_TRACE (data_queue_dataflow,                                 \
97       "unlocking qlock from thread %p",                                 \
98       g_thread_self ());                                                \
99   g_mutex_unlock (&q->priv->qlock);                                     \
100 } G_STMT_END
101
102 #define STATUS(q, msg)                                                  \
103   GST_CAT_LOG (data_queue_dataflow,                                     \
104                "queue:%p " msg ": %u visible items, %u "                \
105                "bytes, %"G_GUINT64_FORMAT                               \
106                " ns, %u elements",                                      \
107                queue,                                                   \
108                q->priv->cur_level.visible,                              \
109                q->priv->cur_level.bytes,                                \
110                q->priv->cur_level.time,                                 \
111                gst_queue_array_get_length (q->priv->queue))
112
113 static void gst_data_queue_finalize (GObject * object);
114
115 static void gst_data_queue_set_property (GObject * object,
116     guint prop_id, const GValue * value, GParamSpec * pspec);
117 static void gst_data_queue_get_property (GObject * object,
118     guint prop_id, GValue * value, GParamSpec * pspec);
119
120 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
121
122 #define _do_init \
123 { \
124   GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \
125       "data queue object"); \
126   GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \
127       "dataflow inside the data queue object"); \
128 }
129
130 #define parent_class gst_data_queue_parent_class
131 G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT, _do_init);
132
133 static void
134 gst_data_queue_class_init (GstDataQueueClass * klass)
135 {
136   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
137
138   g_type_class_add_private (klass, sizeof (GstDataQueuePrivate));
139
140   gobject_class->set_property = gst_data_queue_set_property;
141   gobject_class->get_property = gst_data_queue_get_property;
142
143   /* signals */
144   /**
145    * GstDataQueue::empty:
146    * @queue: the queue instance
147    *
148    * Reports that the queue became empty (empty).
149    * A queue is empty if the total amount of visible items inside it (num-visible, time,
150    * size) is lower than the boundary values which can be set through the GObject
151    * properties.
152    */
153   gst_data_queue_signals[SIGNAL_EMPTY] =
154       g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
155       G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
156       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
157
158   /**
159    * GstDataQueue::full:
160    * @queue: the queue instance
161    *
162    * Reports that the queue became full (full).
163    * A queue is full if the total amount of data inside it (num-visible, time,
164    * size) is higher than the boundary values which can be set through the GObject
165    * properties.
166    */
167   gst_data_queue_signals[SIGNAL_FULL] =
168       g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
169       G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
170       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
171
172   /* properties */
173   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
174       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
175           "Current amount of data in the queue (bytes)",
176           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
177   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_VISIBLE,
178       g_param_spec_uint ("current-level-visible",
179           "Current level (visible items)",
180           "Current number of visible items in the queue", 0, G_MAXUINT, 0,
181           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
182   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
183       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
184           "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
185           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
186
187   gobject_class->finalize = gst_data_queue_finalize;
188 }
189
190 static void
191 gst_data_queue_init (GstDataQueue * queue)
192 {
193   queue->priv =
194       G_TYPE_INSTANCE_GET_PRIVATE (queue, GST_TYPE_DATA_QUEUE,
195       GstDataQueuePrivate);
196
197   queue->priv->cur_level.visible = 0;   /* no content */
198   queue->priv->cur_level.bytes = 0;     /* no content */
199   queue->priv->cur_level.time = 0;      /* no content */
200
201   queue->priv->checkfull = NULL;
202
203   g_mutex_init (&queue->priv->qlock);
204   g_cond_init (&queue->priv->item_add);
205   g_cond_init (&queue->priv->item_del);
206   queue->priv->queue = gst_queue_array_new (50);
207
208   GST_DEBUG ("initialized queue's not_empty & not_full conditions");
209 }
210
211 /**
212  * gst_data_queue_new:
213  * @checkfull: the callback used to tell if the element considers the queue full
214  * or not.
215  * @fullcallback: the callback which will be called when the queue is considered full.
216  * @emptycallback: the callback which will be called when the queue is considered empty.
217  * @checkdata: a #gpointer that will be given in the @checkfull callback.
218  *
219  * Creates a new #GstDataQueue. The difference with @gst_data_queue_new is that it will
220  * not emit the 'full' and 'empty' signals, but instead calling directly @fullcallback
221  * or @emptycallback.
222  *
223  * Returns: a new #GstDataQueue.
224  *
225  * Since: 1.2.0
226  */
227 GstDataQueue *
228 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
229     GstDataQueueFullCallback fullcallback,
230     GstDataQueueEmptyCallback emptycallback, gpointer checkdata)
231 {
232   GstDataQueue *ret;
233
234   g_return_val_if_fail (checkfull != NULL, NULL);
235
236   ret = g_object_newv (GST_TYPE_DATA_QUEUE, 0, NULL);
237   ret->priv->checkfull = checkfull;
238   ret->priv->checkdata = checkdata;
239   ret->priv->fullcallback = fullcallback;
240   ret->priv->emptycallback = emptycallback;
241
242   return ret;
243 }
244
245 static void
246 gst_data_queue_cleanup (GstDataQueue * queue)
247 {
248   GstDataQueuePrivate *priv = queue->priv;
249
250   while (!gst_queue_array_is_empty (priv->queue)) {
251     GstDataQueueItem *item = gst_queue_array_pop_head (priv->queue);
252
253     /* Just call the destroy notify on the item */
254     item->destroy (item);
255   }
256   priv->cur_level.visible = 0;
257   priv->cur_level.bytes = 0;
258   priv->cur_level.time = 0;
259 }
260
261 /* called only once, as opposed to dispose */
262 static void
263 gst_data_queue_finalize (GObject * object)
264 {
265   GstDataQueue *queue = GST_DATA_QUEUE (object);
266   GstDataQueuePrivate *priv = queue->priv;
267
268   GST_DEBUG ("finalizing queue");
269
270   gst_data_queue_cleanup (queue);
271   gst_queue_array_free (priv->queue);
272
273   GST_DEBUG ("free mutex");
274   g_mutex_clear (&priv->qlock);
275   GST_DEBUG ("done free mutex");
276
277   g_cond_clear (&priv->item_add);
278   g_cond_clear (&priv->item_del);
279
280   G_OBJECT_CLASS (parent_class)->finalize (object);
281 }
282
283 static inline void
284 gst_data_queue_locked_flush (GstDataQueue * queue)
285 {
286   GstDataQueuePrivate *priv = queue->priv;
287
288   STATUS (queue, "before flushing");
289   gst_data_queue_cleanup (queue);
290   STATUS (queue, "after flushing");
291   /* we deleted something... */
292   if (priv->waiting_del)
293     g_cond_signal (&priv->item_del);
294 }
295
296 static inline gboolean
297 gst_data_queue_locked_is_empty (GstDataQueue * queue)
298 {
299   GstDataQueuePrivate *priv = queue->priv;
300
301   return (gst_queue_array_get_length (priv->queue) == 0);
302 }
303
304 static inline gboolean
305 gst_data_queue_locked_is_full (GstDataQueue * queue)
306 {
307   GstDataQueuePrivate *priv = queue->priv;
308
309   return priv->checkfull (queue, priv->cur_level.visible,
310       priv->cur_level.bytes, priv->cur_level.time, priv->checkdata);
311 }
312
313 /**
314  * gst_data_queue_flush:
315  * @queue: a #GstDataQueue.
316  *
317  * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
318  * #gst_data_queue_pop will be released.
319  * MT safe.
320  *
321  * Since: 1.2.0
322  */
323 void
324 gst_data_queue_flush (GstDataQueue * queue)
325 {
326   GST_DEBUG ("queue:%p", queue);
327   GST_DATA_QUEUE_MUTEX_LOCK (queue);
328   gst_data_queue_locked_flush (queue);
329   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
330 }
331
332 /**
333  * gst_data_queue_is_empty:
334  * @queue: a #GstDataQueue.
335  *
336  * Queries if there are any items in the @queue.
337  * MT safe.
338  *
339  * Returns: #TRUE if @queue is empty.
340  *
341  * Since: 1.2.0
342  */
343 gboolean
344 gst_data_queue_is_empty (GstDataQueue * queue)
345 {
346   gboolean res;
347
348   GST_DATA_QUEUE_MUTEX_LOCK (queue);
349   res = gst_data_queue_locked_is_empty (queue);
350   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
351
352   return res;
353 }
354
355 /**
356  * gst_data_queue_is_full:
357  * @queue: a #GstDataQueue.
358  *
359  * Queries if @queue is full. This check will be done using the
360  * #GstDataQueueCheckFullFunction registered with @queue.
361  * MT safe.
362  *
363  * Returns: #TRUE if @queue is full.
364  *
365  * Since: 1.2.0
366  */
367 gboolean
368 gst_data_queue_is_full (GstDataQueue * queue)
369 {
370   gboolean res;
371
372   GST_DATA_QUEUE_MUTEX_LOCK (queue);
373   res = gst_data_queue_locked_is_full (queue);
374   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
375
376   return res;
377 }
378
379 /**
380  * gst_data_queue_set_flushing:
381  * @queue: a #GstDataQueue.
382  * @flushing: a #gboolean stating if the queue will be flushing or not.
383  *
384  * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
385  * state, any incoming data on the @queue will be discarded. Any call currently
386  * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
387  * away with a return value of #FALSE. While the @queue is in flushing state, 
388  * all calls to those two functions will return #FALSE.
389  *
390  * MT Safe.
391  *
392  * Since: 1.2.0
393  */
394 void
395 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
396 {
397   GstDataQueuePrivate *priv = queue->priv;
398
399   GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
400
401   GST_DATA_QUEUE_MUTEX_LOCK (queue);
402   priv->flushing = flushing;
403   if (flushing) {
404     /* release push/pop functions */
405     if (priv->waiting_add)
406       g_cond_signal (&priv->item_add);
407     if (priv->waiting_del)
408       g_cond_signal (&priv->item_del);
409   }
410   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
411 }
412
413 /**
414  * gst_data_queue_push:
415  * @queue: a #GstDataQueue.
416  * @item: a #GstDataQueueItem.
417  *
418  * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
419  * on the @queue. If the @queue is full, the call will block until space is
420  * available, OR the @queue is set to flushing state.
421  * MT safe.
422  *
423  * Note that this function has slightly different semantics than gst_pad_push()
424  * and gst_pad_push_event(): this function only takes ownership of @item and
425  * the #GstMiniObject contained in @item if the push was successful. If FALSE
426  * is returned, the caller is responsible for freeing @item and its contents.
427  *
428  * Returns: #TRUE if the @item was successfully pushed on the @queue.
429  *
430  * Since: 1.2.0
431  */
432 gboolean
433 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
434 {
435   GstDataQueuePrivate *priv = queue->priv;
436
437   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
438   g_return_val_if_fail (item != NULL, FALSE);
439
440   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
441
442   STATUS (queue, "before pushing");
443
444   /* We ALWAYS need to check for queue fillness */
445   if (gst_data_queue_locked_is_full (queue)) {
446     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
447     if (G_LIKELY (priv->fullcallback))
448       priv->fullcallback (queue, priv->checkdata);
449     else
450       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0);
451     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
452
453     /* signal might have removed some items */
454     while (gst_data_queue_locked_is_full (queue)) {
455       priv->waiting_del = TRUE;
456       g_cond_wait (&priv->item_del, &priv->qlock);
457       priv->waiting_del = FALSE;
458       if (priv->flushing)
459         goto flushing;
460     }
461   }
462
463   gst_queue_array_push_tail (priv->queue, item);
464
465   if (item->visible)
466     priv->cur_level.visible++;
467   priv->cur_level.bytes += item->size;
468   priv->cur_level.time += item->duration;
469
470   STATUS (queue, "after pushing");
471   if (priv->waiting_add)
472     g_cond_signal (&priv->item_add);
473
474   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
475
476   return TRUE;
477
478   /* ERRORS */
479 flushing:
480   {
481     GST_DEBUG ("queue:%p, we are flushing", queue);
482     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
483     return FALSE;
484   }
485 }
486
487 /**
488  * gst_data_queue_pop:
489  * @queue: a #GstDataQueue.
490  * @item: pointer to store the returned #GstDataQueueItem.
491  *
492  * Retrieves the first @item available on the @queue. If the queue is currently
493  * empty, the call will block until at least one item is available, OR the
494  * @queue is set to the flushing state.
495  * MT safe.
496  *
497  * Returns: #TRUE if an @item was successfully retrieved from the @queue.
498  *
499  * Since: 1.2.0
500  */
501 gboolean
502 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
503 {
504   GstDataQueuePrivate *priv = queue->priv;
505
506   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
507   g_return_val_if_fail (item != NULL, FALSE);
508
509   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
510
511   STATUS (queue, "before popping");
512
513   if (gst_data_queue_locked_is_empty (queue)) {
514     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
515     if (G_LIKELY (priv->emptycallback))
516       priv->emptycallback (queue, priv->checkdata);
517     else
518       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
519     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
520
521     while (gst_data_queue_locked_is_empty (queue)) {
522       priv->waiting_add = TRUE;
523       g_cond_wait (&priv->item_add, &priv->qlock);
524       priv->waiting_add = FALSE;
525       if (priv->flushing)
526         goto flushing;
527     }
528   }
529
530   /* Get the item from the GQueue */
531   *item = gst_queue_array_pop_head (priv->queue);
532
533   /* update current level counter */
534   if ((*item)->visible)
535     priv->cur_level.visible--;
536   priv->cur_level.bytes -= (*item)->size;
537   priv->cur_level.time -= (*item)->duration;
538
539   STATUS (queue, "after popping");
540   if (priv->waiting_del)
541     g_cond_signal (&priv->item_del);
542
543   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
544
545   return TRUE;
546
547   /* ERRORS */
548 flushing:
549   {
550     GST_DEBUG ("queue:%p, we are flushing", queue);
551     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
552     return FALSE;
553   }
554 }
555
556 static gint
557 is_of_type (gconstpointer a, gconstpointer b)
558 {
559   return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_INT (b));
560 }
561
562 /**
563  * gst_data_queue_drop_head:
564  * @queue: The #GstDataQueue to drop an item from.
565  * @type: The #GType of the item to drop.
566  *
567  * Pop and unref the head-most #GstMiniObject with the given #GType.
568  *
569  * Returns: TRUE if an element was removed.
570  *
571  * Since: 1.2.0
572  */
573 gboolean
574 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
575 {
576   gboolean res = FALSE;
577   GstDataQueueItem *leak = NULL;
578   guint idx;
579   GstDataQueuePrivate *priv = queue->priv;
580
581   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
582
583   GST_DEBUG ("queue:%p", queue);
584
585   GST_DATA_QUEUE_MUTEX_LOCK (queue);
586   idx = gst_queue_array_find (priv->queue, is_of_type, GINT_TO_POINTER (type));
587
588   if (idx == -1)
589     goto done;
590
591   leak = gst_queue_array_drop_element (priv->queue, idx);
592
593   if (leak->visible)
594     priv->cur_level.visible--;
595   priv->cur_level.bytes -= leak->size;
596   priv->cur_level.time -= leak->duration;
597
598   leak->destroy (leak);
599
600   res = TRUE;
601
602 done:
603   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
604
605   GST_DEBUG ("queue:%p , res:%d", queue, res);
606
607   return res;
608 }
609
610 /**
611  * gst_data_queue_limits_changed:
612  * @queue: The #GstDataQueue 
613  *
614  * Inform the queue that the limits for the fullness check have changed and that
615  * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
616  *
617  * Since: 1.2.0
618  */
619 void
620 gst_data_queue_limits_changed (GstDataQueue * queue)
621 {
622   GstDataQueuePrivate *priv = queue->priv;
623
624   g_return_if_fail (GST_IS_DATA_QUEUE (queue));
625
626   GST_DATA_QUEUE_MUTEX_LOCK (queue);
627   if (priv->waiting_del) {
628     GST_DEBUG ("signal del");
629     g_cond_signal (&priv->item_del);
630   }
631   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
632 }
633
634 /**
635  * gst_data_queue_get_level:
636  * @queue: The #GstDataQueue
637  * @level: the location to store the result
638  *
639  * Get the current level of the queue.
640  *
641  * Since: 1.2.0
642  */
643 void
644 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
645 {
646   GstDataQueuePrivate *priv = queue->priv;
647
648   memcpy (level, (&priv->cur_level), sizeof (GstDataQueueSize));
649 }
650
651 static void
652 gst_data_queue_set_property (GObject * object,
653     guint prop_id, const GValue * value, GParamSpec * pspec)
654 {
655   switch (prop_id) {
656     default:
657       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
658       break;
659   }
660 }
661
662 static void
663 gst_data_queue_get_property (GObject * object,
664     guint prop_id, GValue * value, GParamSpec * pspec)
665 {
666   GstDataQueue *queue = GST_DATA_QUEUE (object);
667   GstDataQueuePrivate *priv = queue->priv;
668
669   GST_DATA_QUEUE_MUTEX_LOCK (queue);
670
671   switch (prop_id) {
672     case PROP_CUR_LEVEL_BYTES:
673       g_value_set_uint (value, priv->cur_level.bytes);
674       break;
675     case PROP_CUR_LEVEL_VISIBLE:
676       g_value_set_uint (value, priv->cur_level.visible);
677       break;
678     case PROP_CUR_LEVEL_TIME:
679       g_value_set_uint64 (value, priv->cur_level.time);
680       break;
681     default:
682       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
683       break;
684   }
685
686   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
687 }