Update for g_type_class_add_private() deprecation in recent GLib
[platform/upstream/gstreamer.git] / libs / gst / base / gstdataqueue.c
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  *
4  * gstdataqueue.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gstdataqueue
24  * @title: GstDataQueue
25  * @short_description: Threadsafe queueing object
26  *
27  * #GstDataQueue is an object that handles threadsafe queueing of objects. It
28  * also provides size-related functionality. This object should be used for
29  * any #GstElement that wishes to provide some sort of queueing functionality.
30  */
31
32 #include <gst/gst.h>
33 #include "string.h"
34 #include "gstdataqueue.h"
35 #include "gstqueuearray.h"
36 #include "gst/glib-compat-private.h"
37
38 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
39 #define GST_CAT_DEFAULT (data_queue_debug)
40 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
41
42
43 /* Queue signals and args */
44 enum
45 {
46   SIGNAL_EMPTY,
47   SIGNAL_FULL,
48   LAST_SIGNAL
49 };
50
51 enum
52 {
53   PROP_0,
54   PROP_CUR_LEVEL_VISIBLE,
55   PROP_CUR_LEVEL_BYTES,
56   PROP_CUR_LEVEL_TIME
57       /* FILL ME */
58 };
59
60 struct _GstDataQueuePrivate
61 {
62   /* the array of data we're keeping our grubby hands on */
63   GstQueueArray *queue;
64
65   GstDataQueueSize cur_level;   /* size of the queue */
66   GstDataQueueCheckFullFunction checkfull;      /* Callback to check if the queue is full */
67   gpointer *checkdata;
68
69   GMutex qlock;                 /* lock for queue (vs object lock) */
70   gboolean waiting_add;
71   GCond item_add;               /* signals buffers now available for reading */
72   gboolean waiting_del;
73   GCond item_del;               /* signals space now available for writing */
74   gboolean flushing;            /* indicates whether conditions where signalled because
75                                  * of external flushing */
76   GstDataQueueFullCallback fullcallback;
77   GstDataQueueEmptyCallback emptycallback;
78 };
79
80 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
81     GST_CAT_TRACE (data_queue_dataflow,                                 \
82       "locking qlock from thread %p",                                   \
83       g_thread_self ());                                                \
84   g_mutex_lock (&q->priv->qlock);                                       \
85   GST_CAT_TRACE (data_queue_dataflow,                                   \
86       "locked qlock from thread %p",                                    \
87       g_thread_self ());                                                \
88 } G_STMT_END
89
90 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START {        \
91     GST_DATA_QUEUE_MUTEX_LOCK (q);                                      \
92     if (q->priv->flushing)                                              \
93       goto label;                                                       \
94   } G_STMT_END
95
96 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
97     GST_CAT_TRACE (data_queue_dataflow,                                 \
98       "unlocking qlock from thread %p",                                 \
99       g_thread_self ());                                                \
100   g_mutex_unlock (&q->priv->qlock);                                     \
101 } G_STMT_END
102
103 #define STATUS(q, msg)                                                  \
104   GST_CAT_LOG (data_queue_dataflow,                                     \
105                "queue:%p " msg ": %u visible items, %u "                \
106                "bytes, %"G_GUINT64_FORMAT                               \
107                " ns, %u elements",                                      \
108                queue,                                                   \
109                q->priv->cur_level.visible,                              \
110                q->priv->cur_level.bytes,                                \
111                q->priv->cur_level.time,                                 \
112                gst_queue_array_get_length (q->priv->queue))
113
114 static void gst_data_queue_finalize (GObject * object);
115
116 static void gst_data_queue_set_property (GObject * object,
117     guint prop_id, const GValue * value, GParamSpec * pspec);
118 static void gst_data_queue_get_property (GObject * object,
119     guint prop_id, GValue * value, GParamSpec * pspec);
120
121 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
122
123 #define _do_init \
124 { \
125   GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \
126       "data queue object"); \
127   GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \
128       "dataflow inside the data queue object"); \
129 }
130
131 #define parent_class gst_data_queue_parent_class
132 G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT,
133     G_ADD_PRIVATE (GstDataQueue) _do_init);
134
135 static void
136 gst_data_queue_class_init (GstDataQueueClass * klass)
137 {
138   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
139
140   gobject_class->set_property = gst_data_queue_set_property;
141   gobject_class->get_property = gst_data_queue_get_property;
142
143   /* signals */
144   /**
145    * GstDataQueue::empty: (skip)
146    * @queue: the queue instance
147    *
148    * Reports that the queue became empty (empty).
149    * A queue is empty if the total amount of visible items inside it (num-visible, time,
150    * size) is lower than the boundary values which can be set through the GObject
151    * properties.
152    */
153   gst_data_queue_signals[SIGNAL_EMPTY] =
154       g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
155       G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
156       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
157
158   /**
159    * GstDataQueue::full: (skip)
160    * @queue: the queue instance
161    *
162    * Reports that the queue became full (full).
163    * A queue is full if the total amount of data inside it (num-visible, time,
164    * size) is higher than the boundary values which can be set through the GObject
165    * properties.
166    */
167   gst_data_queue_signals[SIGNAL_FULL] =
168       g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
169       G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
170       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
171
172   /* properties */
173   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
174       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
175           "Current amount of data in the queue (bytes)",
176           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
177   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_VISIBLE,
178       g_param_spec_uint ("current-level-visible",
179           "Current level (visible items)",
180           "Current number of visible items in the queue", 0, G_MAXUINT, 0,
181           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
182   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
183       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
184           "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
185           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
186
187   gobject_class->finalize = gst_data_queue_finalize;
188 }
189
190 static void
191 gst_data_queue_init (GstDataQueue * queue)
192 {
193   queue->priv = gst_data_queue_get_instance_private (queue);
194
195   queue->priv->cur_level.visible = 0;   /* no content */
196   queue->priv->cur_level.bytes = 0;     /* no content */
197   queue->priv->cur_level.time = 0;      /* no content */
198
199   queue->priv->checkfull = NULL;
200
201   g_mutex_init (&queue->priv->qlock);
202   g_cond_init (&queue->priv->item_add);
203   g_cond_init (&queue->priv->item_del);
204   queue->priv->queue = gst_queue_array_new (50);
205
206   GST_DEBUG ("initialized queue's not_empty & not_full conditions");
207 }
208
209 /**
210  * gst_data_queue_new: (skip)
211  * @checkfull: the callback used to tell if the element considers the queue full
212  * or not.
213  * @fullcallback: the callback which will be called when the queue is considered full.
214  * @emptycallback: the callback which will be called when the queue is considered empty.
215  * @checkdata: a #gpointer that will be passed to the @checkfull, @fullcallback,
216  *   and @emptycallback callbacks.
217  *
218  * Creates a new #GstDataQueue. If @fullcallback or @emptycallback are supplied, then
219  * the #GstDataQueue will call the respective callback to signal full or empty condition.
220  * If the callbacks are NULL the #GstDataQueue will instead emit 'full' and 'empty'
221  * signals.
222  *
223  * Returns: a new #GstDataQueue.
224  *
225  * Since: 1.2
226  */
227 GstDataQueue *
228 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
229     GstDataQueueFullCallback fullcallback,
230     GstDataQueueEmptyCallback emptycallback, gpointer checkdata)
231 {
232   GstDataQueue *ret;
233
234   g_return_val_if_fail (checkfull != NULL, NULL);
235
236   ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
237   ret->priv->checkfull = checkfull;
238   ret->priv->checkdata = checkdata;
239   ret->priv->fullcallback = fullcallback;
240   ret->priv->emptycallback = emptycallback;
241
242   return ret;
243 }
244
245 static void
246 gst_data_queue_cleanup (GstDataQueue * queue)
247 {
248   GstDataQueuePrivate *priv = queue->priv;
249
250   while (!gst_queue_array_is_empty (priv->queue)) {
251     GstDataQueueItem *item = gst_queue_array_pop_head (priv->queue);
252
253     /* Just call the destroy notify on the item */
254     item->destroy (item);
255   }
256   priv->cur_level.visible = 0;
257   priv->cur_level.bytes = 0;
258   priv->cur_level.time = 0;
259 }
260
261 /* called only once, as opposed to dispose */
262 static void
263 gst_data_queue_finalize (GObject * object)
264 {
265   GstDataQueue *queue = GST_DATA_QUEUE (object);
266   GstDataQueuePrivate *priv = queue->priv;
267
268   GST_DEBUG ("finalizing queue");
269
270   gst_data_queue_cleanup (queue);
271   gst_queue_array_free (priv->queue);
272
273   GST_DEBUG ("free mutex");
274   g_mutex_clear (&priv->qlock);
275   GST_DEBUG ("done free mutex");
276
277   g_cond_clear (&priv->item_add);
278   g_cond_clear (&priv->item_del);
279
280   G_OBJECT_CLASS (parent_class)->finalize (object);
281 }
282
283 static inline void
284 gst_data_queue_locked_flush (GstDataQueue * queue)
285 {
286   GstDataQueuePrivate *priv = queue->priv;
287
288   STATUS (queue, "before flushing");
289   gst_data_queue_cleanup (queue);
290   STATUS (queue, "after flushing");
291   /* we deleted something... */
292   if (priv->waiting_del)
293     g_cond_signal (&priv->item_del);
294 }
295
296 static inline gboolean
297 gst_data_queue_locked_is_empty (GstDataQueue * queue)
298 {
299   GstDataQueuePrivate *priv = queue->priv;
300
301   return (gst_queue_array_get_length (priv->queue) == 0);
302 }
303
304 static inline gboolean
305 gst_data_queue_locked_is_full (GstDataQueue * queue)
306 {
307   GstDataQueuePrivate *priv = queue->priv;
308
309   return priv->checkfull (queue, priv->cur_level.visible,
310       priv->cur_level.bytes, priv->cur_level.time, priv->checkdata);
311 }
312
313 /**
314  * gst_data_queue_flush: (skip)
315  * @queue: a #GstDataQueue.
316  *
317  * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
318  * #gst_data_queue_pop will be released.
319  * MT safe.
320  *
321  * Since: 1.2
322  */
323 void
324 gst_data_queue_flush (GstDataQueue * queue)
325 {
326   GST_DEBUG ("queue:%p", queue);
327   GST_DATA_QUEUE_MUTEX_LOCK (queue);
328   gst_data_queue_locked_flush (queue);
329   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
330 }
331
332 /**
333  * gst_data_queue_is_empty: (skip)
334  * @queue: a #GstDataQueue.
335  *
336  * Queries if there are any items in the @queue.
337  * MT safe.
338  *
339  * Returns: %TRUE if @queue is empty.
340  *
341  * Since: 1.2
342  */
343 gboolean
344 gst_data_queue_is_empty (GstDataQueue * queue)
345 {
346   gboolean res;
347
348   GST_DATA_QUEUE_MUTEX_LOCK (queue);
349   res = gst_data_queue_locked_is_empty (queue);
350   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
351
352   return res;
353 }
354
355 /**
356  * gst_data_queue_is_full: (skip)
357  * @queue: a #GstDataQueue.
358  *
359  * Queries if @queue is full. This check will be done using the
360  * #GstDataQueueCheckFullFunction registered with @queue.
361  * MT safe.
362  *
363  * Returns: %TRUE if @queue is full.
364  *
365  * Since: 1.2
366  */
367 gboolean
368 gst_data_queue_is_full (GstDataQueue * queue)
369 {
370   gboolean res;
371
372   GST_DATA_QUEUE_MUTEX_LOCK (queue);
373   res = gst_data_queue_locked_is_full (queue);
374   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
375
376   return res;
377 }
378
379 /**
380  * gst_data_queue_set_flushing: (skip)
381  * @queue: a #GstDataQueue.
382  * @flushing: a #gboolean stating if the queue will be flushing or not.
383  *
384  * Sets the queue to flushing state if @flushing is %TRUE. If set to flushing
385  * state, any incoming data on the @queue will be discarded. Any call currently
386  * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
387  * away with a return value of %FALSE. While the @queue is in flushing state,
388  * all calls to those two functions will return %FALSE.
389  *
390  * MT Safe.
391  *
392  * Since: 1.2
393  */
394 void
395 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
396 {
397   GstDataQueuePrivate *priv = queue->priv;
398
399   GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
400
401   GST_DATA_QUEUE_MUTEX_LOCK (queue);
402   priv->flushing = flushing;
403   if (flushing) {
404     /* release push/pop functions */
405     if (priv->waiting_add)
406       g_cond_signal (&priv->item_add);
407     if (priv->waiting_del)
408       g_cond_signal (&priv->item_del);
409   }
410   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
411 }
412
413 static void
414 gst_data_queue_push_force_unlocked (GstDataQueue * queue,
415     GstDataQueueItem * item)
416 {
417   GstDataQueuePrivate *priv = queue->priv;
418
419   gst_queue_array_push_tail (priv->queue, item);
420
421   if (item->visible)
422     priv->cur_level.visible++;
423   priv->cur_level.bytes += item->size;
424   priv->cur_level.time += item->duration;
425 }
426
427 /**
428  * gst_data_queue_push_force: (skip)
429  * @queue: a #GstDataQueue.
430  * @item: a #GstDataQueueItem.
431  *
432  * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
433  * on the @queue. It ignores if the @queue is full or not and forces the @item
434  * to be pushed anyway.
435  * MT safe.
436  *
437  * Note that this function has slightly different semantics than gst_pad_push()
438  * and gst_pad_push_event(): this function only takes ownership of @item and
439  * the #GstMiniObject contained in @item if the push was successful. If %FALSE
440  * is returned, the caller is responsible for freeing @item and its contents.
441  *
442  * Returns: %TRUE if the @item was successfully pushed on the @queue.
443  *
444  * Since: 1.2
445  */
446 gboolean
447 gst_data_queue_push_force (GstDataQueue * queue, GstDataQueueItem * item)
448 {
449   GstDataQueuePrivate *priv = queue->priv;
450
451   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
452   g_return_val_if_fail (item != NULL, FALSE);
453
454   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
455
456   STATUS (queue, "before pushing");
457   gst_data_queue_push_force_unlocked (queue, item);
458   STATUS (queue, "after pushing");
459   if (priv->waiting_add)
460     g_cond_signal (&priv->item_add);
461
462   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
463
464   return TRUE;
465
466   /* ERRORS */
467 flushing:
468   {
469     GST_DEBUG ("queue:%p, we are flushing", queue);
470     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
471     return FALSE;
472   }
473 }
474
475 /**
476  * gst_data_queue_push: (skip)
477  * @queue: a #GstDataQueue.
478  * @item: a #GstDataQueueItem.
479  *
480  * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
481  * on the @queue. If the @queue is full, the call will block until space is
482  * available, OR the @queue is set to flushing state.
483  * MT safe.
484  *
485  * Note that this function has slightly different semantics than gst_pad_push()
486  * and gst_pad_push_event(): this function only takes ownership of @item and
487  * the #GstMiniObject contained in @item if the push was successful. If %FALSE
488  * is returned, the caller is responsible for freeing @item and its contents.
489  *
490  * Returns: %TRUE if the @item was successfully pushed on the @queue.
491  *
492  * Since: 1.2
493  */
494 gboolean
495 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
496 {
497   GstDataQueuePrivate *priv = queue->priv;
498
499   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
500   g_return_val_if_fail (item != NULL, FALSE);
501
502   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
503
504   STATUS (queue, "before pushing");
505
506   /* We ALWAYS need to check for queue fillness */
507   if (gst_data_queue_locked_is_full (queue)) {
508     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
509     if (G_LIKELY (priv->fullcallback))
510       priv->fullcallback (queue, priv->checkdata);
511     else
512       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0);
513     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
514
515     /* signal might have removed some items */
516     while (gst_data_queue_locked_is_full (queue)) {
517       priv->waiting_del = TRUE;
518       g_cond_wait (&priv->item_del, &priv->qlock);
519       priv->waiting_del = FALSE;
520       if (priv->flushing)
521         goto flushing;
522     }
523   }
524
525   gst_data_queue_push_force_unlocked (queue, item);
526
527   STATUS (queue, "after pushing");
528   if (priv->waiting_add)
529     g_cond_signal (&priv->item_add);
530
531   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
532
533   return TRUE;
534
535   /* ERRORS */
536 flushing:
537   {
538     GST_DEBUG ("queue:%p, we are flushing", queue);
539     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
540     return FALSE;
541   }
542 }
543
544 static gboolean
545 _gst_data_queue_wait_non_empty (GstDataQueue * queue)
546 {
547   GstDataQueuePrivate *priv = queue->priv;
548
549   while (gst_data_queue_locked_is_empty (queue)) {
550     priv->waiting_add = TRUE;
551     g_cond_wait (&priv->item_add, &priv->qlock);
552     priv->waiting_add = FALSE;
553     if (priv->flushing)
554       return FALSE;
555   }
556   return TRUE;
557 }
558
559 /**
560  * gst_data_queue_pop: (skip)
561  * @queue: a #GstDataQueue.
562  * @item: (out): pointer to store the returned #GstDataQueueItem.
563  *
564  * Retrieves the first @item available on the @queue. If the queue is currently
565  * empty, the call will block until at least one item is available, OR the
566  * @queue is set to the flushing state.
567  * MT safe.
568  *
569  * Returns: %TRUE if an @item was successfully retrieved from the @queue.
570  *
571  * Since: 1.2
572  */
573 gboolean
574 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
575 {
576   GstDataQueuePrivate *priv = queue->priv;
577
578   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
579   g_return_val_if_fail (item != NULL, FALSE);
580
581   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
582
583   STATUS (queue, "before popping");
584
585   if (gst_data_queue_locked_is_empty (queue)) {
586     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
587     if (G_LIKELY (priv->emptycallback))
588       priv->emptycallback (queue, priv->checkdata);
589     else
590       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
591     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
592
593     if (!_gst_data_queue_wait_non_empty (queue))
594       goto flushing;
595   }
596
597   /* Get the item from the GQueue */
598   *item = gst_queue_array_pop_head (priv->queue);
599
600   /* update current level counter */
601   if ((*item)->visible)
602     priv->cur_level.visible--;
603   priv->cur_level.bytes -= (*item)->size;
604   priv->cur_level.time -= (*item)->duration;
605
606   STATUS (queue, "after popping");
607   if (priv->waiting_del)
608     g_cond_signal (&priv->item_del);
609
610   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
611
612   return TRUE;
613
614   /* ERRORS */
615 flushing:
616   {
617     GST_DEBUG ("queue:%p, we are flushing", queue);
618     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
619     return FALSE;
620   }
621 }
622
623 static gint
624 is_of_type (gconstpointer a, gconstpointer b)
625 {
626   return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_SIZE (b));
627 }
628
629 /**
630  * gst_data_queue_peek: (skip)
631  * @queue: a #GstDataQueue.
632  * @item: (out): pointer to store the returned #GstDataQueueItem.
633  *
634  * Retrieves the first @item available on the @queue without removing it.
635  * If the queue is currently empty, the call will block until at least
636  * one item is available, OR the @queue is set to the flushing state.
637  * MT safe.
638  *
639  * Returns: %TRUE if an @item was successfully retrieved from the @queue.
640  *
641  * Since: 1.2
642  */
643 gboolean
644 gst_data_queue_peek (GstDataQueue * queue, GstDataQueueItem ** item)
645 {
646   GstDataQueuePrivate *priv = queue->priv;
647
648   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
649   g_return_val_if_fail (item != NULL, FALSE);
650
651   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
652
653   STATUS (queue, "before peeking");
654
655   if (gst_data_queue_locked_is_empty (queue)) {
656     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
657     if (G_LIKELY (priv->emptycallback))
658       priv->emptycallback (queue, priv->checkdata);
659     else
660       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
661     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
662
663     if (!_gst_data_queue_wait_non_empty (queue))
664       goto flushing;
665   }
666
667   /* Get the item from the GQueue */
668   *item = gst_queue_array_peek_head (priv->queue);
669
670   STATUS (queue, "after peeking");
671   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
672
673   return TRUE;
674
675   /* ERRORS */
676 flushing:
677   {
678     GST_DEBUG ("queue:%p, we are flushing", queue);
679     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
680     return FALSE;
681   }
682 }
683
684 /**
685  * gst_data_queue_drop_head: (skip)
686  * @queue: The #GstDataQueue to drop an item from.
687  * @type: The #GType of the item to drop.
688  *
689  * Pop and unref the head-most #GstMiniObject with the given #GType.
690  *
691  * Returns: %TRUE if an element was removed.
692  *
693  * Since: 1.2
694  */
695 gboolean
696 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
697 {
698   gboolean res = FALSE;
699   GstDataQueueItem *leak = NULL;
700   guint idx;
701   GstDataQueuePrivate *priv = queue->priv;
702
703   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
704
705   GST_DEBUG ("queue:%p", queue);
706
707   GST_DATA_QUEUE_MUTEX_LOCK (queue);
708   idx = gst_queue_array_find (priv->queue, is_of_type, GSIZE_TO_POINTER (type));
709
710   if (idx == -1)
711     goto done;
712
713   leak = gst_queue_array_drop_element (priv->queue, idx);
714
715   if (leak->visible)
716     priv->cur_level.visible--;
717   priv->cur_level.bytes -= leak->size;
718   priv->cur_level.time -= leak->duration;
719
720   leak->destroy (leak);
721
722   res = TRUE;
723
724 done:
725   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
726
727   GST_DEBUG ("queue:%p , res:%d", queue, res);
728
729   return res;
730 }
731
732 /**
733  * gst_data_queue_limits_changed: (skip)
734  * @queue: The #GstDataQueue
735  *
736  * Inform the queue that the limits for the fullness check have changed and that
737  * any blocking gst_data_queue_push() should be unblocked to recheck the limits.
738  *
739  * Since: 1.2
740  */
741 void
742 gst_data_queue_limits_changed (GstDataQueue * queue)
743 {
744   GstDataQueuePrivate *priv = queue->priv;
745
746   g_return_if_fail (GST_IS_DATA_QUEUE (queue));
747
748   GST_DATA_QUEUE_MUTEX_LOCK (queue);
749   if (priv->waiting_del) {
750     GST_DEBUG ("signal del");
751     g_cond_signal (&priv->item_del);
752   }
753   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
754 }
755
756 /**
757  * gst_data_queue_get_level: (skip)
758  * @queue: The #GstDataQueue
759  * @level: (out): the location to store the result
760  *
761  * Get the current level of the queue.
762  *
763  * Since: 1.2
764  */
765 void
766 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
767 {
768   GstDataQueuePrivate *priv = queue->priv;
769
770   memcpy (level, (&priv->cur_level), sizeof (GstDataQueueSize));
771 }
772
773 static void
774 gst_data_queue_set_property (GObject * object,
775     guint prop_id, const GValue * value, GParamSpec * pspec)
776 {
777   switch (prop_id) {
778     default:
779       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
780       break;
781   }
782 }
783
784 static void
785 gst_data_queue_get_property (GObject * object,
786     guint prop_id, GValue * value, GParamSpec * pspec)
787 {
788   GstDataQueue *queue = GST_DATA_QUEUE (object);
789   GstDataQueuePrivate *priv = queue->priv;
790
791   GST_DATA_QUEUE_MUTEX_LOCK (queue);
792
793   switch (prop_id) {
794     case PROP_CUR_LEVEL_BYTES:
795       g_value_set_uint (value, priv->cur_level.bytes);
796       break;
797     case PROP_CUR_LEVEL_VISIBLE:
798       g_value_set_uint (value, priv->cur_level.visible);
799       break;
800     case PROP_CUR_LEVEL_TIME:
801       g_value_set_uint64 (value, priv->cur_level.time);
802       break;
803     default:
804       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
805       break;
806   }
807
808   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
809 }