Port gtk-doc comments to their equivalent markdown syntax
[platform/upstream/gstreamer.git] / libs / gst / base / gstdataqueue.c
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  *
4  * gstdataqueue.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gstdataqueue
24  * @title: GstDataQueue
25  * @short_description: Threadsafe queueing object
26  *
27  * #GstDataQueue is an object that handles threadsafe queueing of objects. It
28  * also provides size-related functionality. This object should be used for
29  * any #GstElement that wishes to provide some sort of queueing functionality.
30  */
31
32 #include <gst/gst.h>
33 #include "string.h"
34 #include "gstdataqueue.h"
35 #include "gstqueuearray.h"
36 #include "gst/glib-compat-private.h"
37
38 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
39 #define GST_CAT_DEFAULT (data_queue_debug)
40 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
41
42
43 /* Queue signals and args */
44 enum
45 {
46   SIGNAL_EMPTY,
47   SIGNAL_FULL,
48   LAST_SIGNAL
49 };
50
51 enum
52 {
53   PROP_0,
54   PROP_CUR_LEVEL_VISIBLE,
55   PROP_CUR_LEVEL_BYTES,
56   PROP_CUR_LEVEL_TIME
57       /* FILL ME */
58 };
59
60 struct _GstDataQueuePrivate
61 {
62   /* the array of data we're keeping our grubby hands on */
63   GstQueueArray *queue;
64
65   GstDataQueueSize cur_level;   /* size of the queue */
66   GstDataQueueCheckFullFunction checkfull;      /* Callback to check if the queue is full */
67   gpointer *checkdata;
68
69   GMutex qlock;                 /* lock for queue (vs object lock) */
70   gboolean waiting_add;
71   GCond item_add;               /* signals buffers now available for reading */
72   gboolean waiting_del;
73   GCond item_del;               /* signals space now available for writing */
74   gboolean flushing;            /* indicates whether conditions where signalled because
75                                  * of external flushing */
76   GstDataQueueFullCallback fullcallback;
77   GstDataQueueEmptyCallback emptycallback;
78 };
79
80 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
81     GST_CAT_TRACE (data_queue_dataflow,                                 \
82       "locking qlock from thread %p",                                   \
83       g_thread_self ());                                                \
84   g_mutex_lock (&q->priv->qlock);                                       \
85   GST_CAT_TRACE (data_queue_dataflow,                                   \
86       "locked qlock from thread %p",                                    \
87       g_thread_self ());                                                \
88 } G_STMT_END
89
90 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START {        \
91     GST_DATA_QUEUE_MUTEX_LOCK (q);                                      \
92     if (q->priv->flushing)                                              \
93       goto label;                                                       \
94   } G_STMT_END
95
96 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
97     GST_CAT_TRACE (data_queue_dataflow,                                 \
98       "unlocking qlock from thread %p",                                 \
99       g_thread_self ());                                                \
100   g_mutex_unlock (&q->priv->qlock);                                     \
101 } G_STMT_END
102
103 #define STATUS(q, msg)                                                  \
104   GST_CAT_LOG (data_queue_dataflow,                                     \
105                "queue:%p " msg ": %u visible items, %u "                \
106                "bytes, %"G_GUINT64_FORMAT                               \
107                " ns, %u elements",                                      \
108                queue,                                                   \
109                q->priv->cur_level.visible,                              \
110                q->priv->cur_level.bytes,                                \
111                q->priv->cur_level.time,                                 \
112                gst_queue_array_get_length (q->priv->queue))
113
114 static void gst_data_queue_finalize (GObject * object);
115
116 static void gst_data_queue_set_property (GObject * object,
117     guint prop_id, const GValue * value, GParamSpec * pspec);
118 static void gst_data_queue_get_property (GObject * object,
119     guint prop_id, GValue * value, GParamSpec * pspec);
120
121 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
122
123 #define _do_init \
124 { \
125   GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \
126       "data queue object"); \
127   GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \
128       "dataflow inside the data queue object"); \
129 }
130
131 #define parent_class gst_data_queue_parent_class
132 G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT, _do_init);
133
134 static void
135 gst_data_queue_class_init (GstDataQueueClass * klass)
136 {
137   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
138
139   g_type_class_add_private (klass, sizeof (GstDataQueuePrivate));
140
141   gobject_class->set_property = gst_data_queue_set_property;
142   gobject_class->get_property = gst_data_queue_get_property;
143
144   /* signals */
145   /**
146    * GstDataQueue::empty: (skip)
147    * @queue: the queue instance
148    *
149    * Reports that the queue became empty (empty).
150    * A queue is empty if the total amount of visible items inside it (num-visible, time,
151    * size) is lower than the boundary values which can be set through the GObject
152    * properties.
153    */
154   gst_data_queue_signals[SIGNAL_EMPTY] =
155       g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
156       G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
157       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
158
159   /**
160    * GstDataQueue::full: (skip)
161    * @queue: the queue instance
162    *
163    * Reports that the queue became full (full).
164    * A queue is full if the total amount of data inside it (num-visible, time,
165    * size) is higher than the boundary values which can be set through the GObject
166    * properties.
167    */
168   gst_data_queue_signals[SIGNAL_FULL] =
169       g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
170       G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
171       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
172
173   /* properties */
174   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
175       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
176           "Current amount of data in the queue (bytes)",
177           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
178   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_VISIBLE,
179       g_param_spec_uint ("current-level-visible",
180           "Current level (visible items)",
181           "Current number of visible items in the queue", 0, G_MAXUINT, 0,
182           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
183   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
184       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
185           "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
186           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
187
188   gobject_class->finalize = gst_data_queue_finalize;
189 }
190
191 static void
192 gst_data_queue_init (GstDataQueue * queue)
193 {
194   queue->priv =
195       G_TYPE_INSTANCE_GET_PRIVATE (queue, GST_TYPE_DATA_QUEUE,
196       GstDataQueuePrivate);
197
198   queue->priv->cur_level.visible = 0;   /* no content */
199   queue->priv->cur_level.bytes = 0;     /* no content */
200   queue->priv->cur_level.time = 0;      /* no content */
201
202   queue->priv->checkfull = NULL;
203
204   g_mutex_init (&queue->priv->qlock);
205   g_cond_init (&queue->priv->item_add);
206   g_cond_init (&queue->priv->item_del);
207   queue->priv->queue = gst_queue_array_new (50);
208
209   GST_DEBUG ("initialized queue's not_empty & not_full conditions");
210 }
211
212 /**
213  * gst_data_queue_new: (skip)
214  * @checkfull: the callback used to tell if the element considers the queue full
215  * or not.
216  * @fullcallback: the callback which will be called when the queue is considered full.
217  * @emptycallback: the callback which will be called when the queue is considered empty.
218  * @checkdata: a #gpointer that will be passed to the @checkfull, @fullcallback,
219  *   and @emptycallback callbacks.
220  *
221  * Creates a new #GstDataQueue. If @fullcallback or @emptycallback are supplied, then
222  * the #GstDataQueue will call the respective callback to signal full or empty condition.
223  * If the callbacks are NULL the #GstDataQueue will instead emit 'full' and 'empty'
224  * signals.
225  *
226  * Returns: a new #GstDataQueue.
227  *
228  * Since: 1.2
229  */
230 GstDataQueue *
231 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
232     GstDataQueueFullCallback fullcallback,
233     GstDataQueueEmptyCallback emptycallback, gpointer checkdata)
234 {
235   GstDataQueue *ret;
236
237   g_return_val_if_fail (checkfull != NULL, NULL);
238
239   ret = g_object_newv (GST_TYPE_DATA_QUEUE, 0, NULL);
240   ret->priv->checkfull = checkfull;
241   ret->priv->checkdata = checkdata;
242   ret->priv->fullcallback = fullcallback;
243   ret->priv->emptycallback = emptycallback;
244
245   return ret;
246 }
247
248 static void
249 gst_data_queue_cleanup (GstDataQueue * queue)
250 {
251   GstDataQueuePrivate *priv = queue->priv;
252
253   while (!gst_queue_array_is_empty (priv->queue)) {
254     GstDataQueueItem *item = gst_queue_array_pop_head (priv->queue);
255
256     /* Just call the destroy notify on the item */
257     item->destroy (item);
258   }
259   priv->cur_level.visible = 0;
260   priv->cur_level.bytes = 0;
261   priv->cur_level.time = 0;
262 }
263
264 /* called only once, as opposed to dispose */
265 static void
266 gst_data_queue_finalize (GObject * object)
267 {
268   GstDataQueue *queue = GST_DATA_QUEUE (object);
269   GstDataQueuePrivate *priv = queue->priv;
270
271   GST_DEBUG ("finalizing queue");
272
273   gst_data_queue_cleanup (queue);
274   gst_queue_array_free (priv->queue);
275
276   GST_DEBUG ("free mutex");
277   g_mutex_clear (&priv->qlock);
278   GST_DEBUG ("done free mutex");
279
280   g_cond_clear (&priv->item_add);
281   g_cond_clear (&priv->item_del);
282
283   G_OBJECT_CLASS (parent_class)->finalize (object);
284 }
285
286 static inline void
287 gst_data_queue_locked_flush (GstDataQueue * queue)
288 {
289   GstDataQueuePrivate *priv = queue->priv;
290
291   STATUS (queue, "before flushing");
292   gst_data_queue_cleanup (queue);
293   STATUS (queue, "after flushing");
294   /* we deleted something... */
295   if (priv->waiting_del)
296     g_cond_signal (&priv->item_del);
297 }
298
299 static inline gboolean
300 gst_data_queue_locked_is_empty (GstDataQueue * queue)
301 {
302   GstDataQueuePrivate *priv = queue->priv;
303
304   return (gst_queue_array_get_length (priv->queue) == 0);
305 }
306
307 static inline gboolean
308 gst_data_queue_locked_is_full (GstDataQueue * queue)
309 {
310   GstDataQueuePrivate *priv = queue->priv;
311
312   return priv->checkfull (queue, priv->cur_level.visible,
313       priv->cur_level.bytes, priv->cur_level.time, priv->checkdata);
314 }
315
316 /**
317  * gst_data_queue_flush: (skip)
318  * @queue: a #GstDataQueue.
319  *
320  * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
321  * #gst_data_queue_pop will be released.
322  * MT safe.
323  *
324  * Since: 1.2
325  */
326 void
327 gst_data_queue_flush (GstDataQueue * queue)
328 {
329   GST_DEBUG ("queue:%p", queue);
330   GST_DATA_QUEUE_MUTEX_LOCK (queue);
331   gst_data_queue_locked_flush (queue);
332   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
333 }
334
335 /**
336  * gst_data_queue_is_empty: (skip)
337  * @queue: a #GstDataQueue.
338  *
339  * Queries if there are any items in the @queue.
340  * MT safe.
341  *
342  * Returns: %TRUE if @queue is empty.
343  *
344  * Since: 1.2
345  */
346 gboolean
347 gst_data_queue_is_empty (GstDataQueue * queue)
348 {
349   gboolean res;
350
351   GST_DATA_QUEUE_MUTEX_LOCK (queue);
352   res = gst_data_queue_locked_is_empty (queue);
353   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
354
355   return res;
356 }
357
358 /**
359  * gst_data_queue_is_full: (skip)
360  * @queue: a #GstDataQueue.
361  *
362  * Queries if @queue is full. This check will be done using the
363  * #GstDataQueueCheckFullFunction registered with @queue.
364  * MT safe.
365  *
366  * Returns: %TRUE if @queue is full.
367  *
368  * Since: 1.2
369  */
370 gboolean
371 gst_data_queue_is_full (GstDataQueue * queue)
372 {
373   gboolean res;
374
375   GST_DATA_QUEUE_MUTEX_LOCK (queue);
376   res = gst_data_queue_locked_is_full (queue);
377   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
378
379   return res;
380 }
381
382 /**
383  * gst_data_queue_set_flushing: (skip)
384  * @queue: a #GstDataQueue.
385  * @flushing: a #gboolean stating if the queue will be flushing or not.
386  *
387  * Sets the queue to flushing state if @flushing is %TRUE. If set to flushing
388  * state, any incoming data on the @queue will be discarded. Any call currently
389  * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
390  * away with a return value of %FALSE. While the @queue is in flushing state,
391  * all calls to those two functions will return %FALSE.
392  *
393  * MT Safe.
394  *
395  * Since: 1.2
396  */
397 void
398 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
399 {
400   GstDataQueuePrivate *priv = queue->priv;
401
402   GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
403
404   GST_DATA_QUEUE_MUTEX_LOCK (queue);
405   priv->flushing = flushing;
406   if (flushing) {
407     /* release push/pop functions */
408     if (priv->waiting_add)
409       g_cond_signal (&priv->item_add);
410     if (priv->waiting_del)
411       g_cond_signal (&priv->item_del);
412   }
413   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
414 }
415
416 static void
417 gst_data_queue_push_force_unlocked (GstDataQueue * queue,
418     GstDataQueueItem * item)
419 {
420   GstDataQueuePrivate *priv = queue->priv;
421
422   gst_queue_array_push_tail (priv->queue, item);
423
424   if (item->visible)
425     priv->cur_level.visible++;
426   priv->cur_level.bytes += item->size;
427   priv->cur_level.time += item->duration;
428 }
429
430 /**
431  * gst_data_queue_push_force: (skip)
432  * @queue: a #GstDataQueue.
433  * @item: a #GstDataQueueItem.
434  *
435  * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
436  * on the @queue. It ignores if the @queue is full or not and forces the @item
437  * to be pushed anyway.
438  * MT safe.
439  *
440  * Note that this function has slightly different semantics than gst_pad_push()
441  * and gst_pad_push_event(): this function only takes ownership of @item and
442  * the #GstMiniObject contained in @item if the push was successful. If %FALSE
443  * is returned, the caller is responsible for freeing @item and its contents.
444  *
445  * Returns: %TRUE if the @item was successfully pushed on the @queue.
446  *
447  * Since: 1.2
448  */
449 gboolean
450 gst_data_queue_push_force (GstDataQueue * queue, GstDataQueueItem * item)
451 {
452   GstDataQueuePrivate *priv = queue->priv;
453
454   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
455   g_return_val_if_fail (item != NULL, FALSE);
456
457   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
458
459   STATUS (queue, "before pushing");
460   gst_data_queue_push_force_unlocked (queue, item);
461   STATUS (queue, "after pushing");
462   if (priv->waiting_add)
463     g_cond_signal (&priv->item_add);
464
465   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
466
467   return TRUE;
468
469   /* ERRORS */
470 flushing:
471   {
472     GST_DEBUG ("queue:%p, we are flushing", queue);
473     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
474     return FALSE;
475   }
476 }
477
478 /**
479  * gst_data_queue_push: (skip)
480  * @queue: a #GstDataQueue.
481  * @item: a #GstDataQueueItem.
482  *
483  * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
484  * on the @queue. If the @queue is full, the call will block until space is
485  * available, OR the @queue is set to flushing state.
486  * MT safe.
487  *
488  * Note that this function has slightly different semantics than gst_pad_push()
489  * and gst_pad_push_event(): this function only takes ownership of @item and
490  * the #GstMiniObject contained in @item if the push was successful. If %FALSE
491  * is returned, the caller is responsible for freeing @item and its contents.
492  *
493  * Returns: %TRUE if the @item was successfully pushed on the @queue.
494  *
495  * Since: 1.2
496  */
497 gboolean
498 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
499 {
500   GstDataQueuePrivate *priv = queue->priv;
501
502   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
503   g_return_val_if_fail (item != NULL, FALSE);
504
505   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
506
507   STATUS (queue, "before pushing");
508
509   /* We ALWAYS need to check for queue fillness */
510   if (gst_data_queue_locked_is_full (queue)) {
511     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
512     if (G_LIKELY (priv->fullcallback))
513       priv->fullcallback (queue, priv->checkdata);
514     else
515       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0);
516     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
517
518     /* signal might have removed some items */
519     while (gst_data_queue_locked_is_full (queue)) {
520       priv->waiting_del = TRUE;
521       g_cond_wait (&priv->item_del, &priv->qlock);
522       priv->waiting_del = FALSE;
523       if (priv->flushing)
524         goto flushing;
525     }
526   }
527
528   gst_data_queue_push_force_unlocked (queue, item);
529
530   STATUS (queue, "after pushing");
531   if (priv->waiting_add)
532     g_cond_signal (&priv->item_add);
533
534   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
535
536   return TRUE;
537
538   /* ERRORS */
539 flushing:
540   {
541     GST_DEBUG ("queue:%p, we are flushing", queue);
542     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
543     return FALSE;
544   }
545 }
546
547 static gboolean
548 _gst_data_queue_wait_non_empty (GstDataQueue * queue)
549 {
550   GstDataQueuePrivate *priv = queue->priv;
551
552   while (gst_data_queue_locked_is_empty (queue)) {
553     priv->waiting_add = TRUE;
554     g_cond_wait (&priv->item_add, &priv->qlock);
555     priv->waiting_add = FALSE;
556     if (priv->flushing)
557       return FALSE;
558   }
559   return TRUE;
560 }
561
562 /**
563  * gst_data_queue_pop: (skip)
564  * @queue: a #GstDataQueue.
565  * @item: pointer to store the returned #GstDataQueueItem.
566  *
567  * Retrieves the first @item available on the @queue. If the queue is currently
568  * empty, the call will block until at least one item is available, OR the
569  * @queue is set to the flushing state.
570  * MT safe.
571  *
572  * Returns: %TRUE if an @item was successfully retrieved from the @queue.
573  *
574  * Since: 1.2
575  */
576 gboolean
577 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
578 {
579   GstDataQueuePrivate *priv = queue->priv;
580
581   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
582   g_return_val_if_fail (item != NULL, FALSE);
583
584   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
585
586   STATUS (queue, "before popping");
587
588   if (gst_data_queue_locked_is_empty (queue)) {
589     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
590     if (G_LIKELY (priv->emptycallback))
591       priv->emptycallback (queue, priv->checkdata);
592     else
593       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
594     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
595
596     if (!_gst_data_queue_wait_non_empty (queue))
597       goto flushing;
598   }
599
600   /* Get the item from the GQueue */
601   *item = gst_queue_array_pop_head (priv->queue);
602
603   /* update current level counter */
604   if ((*item)->visible)
605     priv->cur_level.visible--;
606   priv->cur_level.bytes -= (*item)->size;
607   priv->cur_level.time -= (*item)->duration;
608
609   STATUS (queue, "after popping");
610   if (priv->waiting_del)
611     g_cond_signal (&priv->item_del);
612
613   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
614
615   return TRUE;
616
617   /* ERRORS */
618 flushing:
619   {
620     GST_DEBUG ("queue:%p, we are flushing", queue);
621     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
622     return FALSE;
623   }
624 }
625
626 static gint
627 is_of_type (gconstpointer a, gconstpointer b)
628 {
629   return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_SIZE (b));
630 }
631
632 /**
633  * gst_data_queue_peek: (skip)
634  * @queue: a #GstDataQueue.
635  * @item: pointer to store the returned #GstDataQueueItem.
636  *
637  * Retrieves the first @item available on the @queue without removing it.
638  * If the queue is currently empty, the call will block until at least
639  * one item is available, OR the @queue is set to the flushing state.
640  * MT safe.
641  *
642  * Returns: %TRUE if an @item was successfully retrieved from the @queue.
643  *
644  * Since: 1.2
645  */
646 gboolean
647 gst_data_queue_peek (GstDataQueue * queue, GstDataQueueItem ** item)
648 {
649   GstDataQueuePrivate *priv = queue->priv;
650
651   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
652   g_return_val_if_fail (item != NULL, FALSE);
653
654   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
655
656   STATUS (queue, "before peeking");
657
658   if (gst_data_queue_locked_is_empty (queue)) {
659     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
660     if (G_LIKELY (priv->emptycallback))
661       priv->emptycallback (queue, priv->checkdata);
662     else
663       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
664     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
665
666     if (!_gst_data_queue_wait_non_empty (queue))
667       goto flushing;
668   }
669
670   /* Get the item from the GQueue */
671   *item = gst_queue_array_peek_head (priv->queue);
672
673   STATUS (queue, "after peeking");
674   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
675
676   return TRUE;
677
678   /* ERRORS */
679 flushing:
680   {
681     GST_DEBUG ("queue:%p, we are flushing", queue);
682     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
683     return FALSE;
684   }
685 }
686
687 /**
688  * gst_data_queue_drop_head: (skip)
689  * @queue: The #GstDataQueue to drop an item from.
690  * @type: The #GType of the item to drop.
691  *
692  * Pop and unref the head-most #GstMiniObject with the given #GType.
693  *
694  * Returns: %TRUE if an element was removed.
695  *
696  * Since: 1.2
697  */
698 gboolean
699 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
700 {
701   gboolean res = FALSE;
702   GstDataQueueItem *leak = NULL;
703   guint idx;
704   GstDataQueuePrivate *priv = queue->priv;
705
706   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
707
708   GST_DEBUG ("queue:%p", queue);
709
710   GST_DATA_QUEUE_MUTEX_LOCK (queue);
711   idx = gst_queue_array_find (priv->queue, is_of_type, GSIZE_TO_POINTER (type));
712
713   if (idx == -1)
714     goto done;
715
716   leak = gst_queue_array_drop_element (priv->queue, idx);
717
718   if (leak->visible)
719     priv->cur_level.visible--;
720   priv->cur_level.bytes -= leak->size;
721   priv->cur_level.time -= leak->duration;
722
723   leak->destroy (leak);
724
725   res = TRUE;
726
727 done:
728   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
729
730   GST_DEBUG ("queue:%p , res:%d", queue, res);
731
732   return res;
733 }
734
735 /**
736  * gst_data_queue_limits_changed: (skip)
737  * @queue: The #GstDataQueue
738  *
739  * Inform the queue that the limits for the fullness check have changed and that
740  * any blocking gst_data_queue_push() should be unblocked to recheck the limits.
741  *
742  * Since: 1.2
743  */
744 void
745 gst_data_queue_limits_changed (GstDataQueue * queue)
746 {
747   GstDataQueuePrivate *priv = queue->priv;
748
749   g_return_if_fail (GST_IS_DATA_QUEUE (queue));
750
751   GST_DATA_QUEUE_MUTEX_LOCK (queue);
752   if (priv->waiting_del) {
753     GST_DEBUG ("signal del");
754     g_cond_signal (&priv->item_del);
755   }
756   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
757 }
758
759 /**
760  * gst_data_queue_get_level: (skip)
761  * @queue: The #GstDataQueue
762  * @level: the location to store the result
763  *
764  * Get the current level of the queue.
765  *
766  * Since: 1.2
767  */
768 void
769 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
770 {
771   GstDataQueuePrivate *priv = queue->priv;
772
773   memcpy (level, (&priv->cur_level), sizeof (GstDataQueueSize));
774 }
775
776 static void
777 gst_data_queue_set_property (GObject * object,
778     guint prop_id, const GValue * value, GParamSpec * pspec)
779 {
780   switch (prop_id) {
781     default:
782       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
783       break;
784   }
785 }
786
787 static void
788 gst_data_queue_get_property (GObject * object,
789     guint prop_id, GValue * value, GParamSpec * pspec)
790 {
791   GstDataQueue *queue = GST_DATA_QUEUE (object);
792   GstDataQueuePrivate *priv = queue->priv;
793
794   GST_DATA_QUEUE_MUTEX_LOCK (queue);
795
796   switch (prop_id) {
797     case PROP_CUR_LEVEL_BYTES:
798       g_value_set_uint (value, priv->cur_level.bytes);
799       break;
800     case PROP_CUR_LEVEL_VISIBLE:
801       g_value_set_uint (value, priv->cur_level.visible);
802       break;
803     case PROP_CUR_LEVEL_TIME:
804       g_value_set_uint64 (value, priv->cur_level.time);
805       break;
806     default:
807       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
808       break;
809   }
810
811   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
812 }