Add few missing allow-none annotation
[platform/upstream/gstreamer.git] / libs / gst / base / gstdataqueue.c
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  *
4  * gstdataqueue.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gstdataqueue
24  * @short_description: Threadsafe queueing object
25  *
26  * #GstDataQueue is an object that handles threadsafe queueing of objects. It
27  * also provides size-related functionality. This object should be used for
28  * any #GstElement that wishes to provide some sort of queueing functionality.
29  */
30
31 #include <gst/gst.h>
32 #include "string.h"
33 #include "gstdataqueue.h"
34 #include "gstqueuearray.h"
35 #include "gst/glib-compat-private.h"
36
37 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
38 #define GST_CAT_DEFAULT (data_queue_debug)
39 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
40
41
42 /* Queue signals and args */
43 enum
44 {
45   SIGNAL_EMPTY,
46   SIGNAL_FULL,
47   LAST_SIGNAL
48 };
49
50 enum
51 {
52   PROP_0,
53   PROP_CUR_LEVEL_VISIBLE,
54   PROP_CUR_LEVEL_BYTES,
55   PROP_CUR_LEVEL_TIME
56       /* FILL ME */
57 };
58
59 struct _GstDataQueuePrivate
60 {
61   /* the array of data we're keeping our grubby hands on */
62   GstQueueArray *queue;
63
64   GstDataQueueSize cur_level;   /* size of the queue */
65   GstDataQueueCheckFullFunction checkfull;      /* Callback to check if the queue is full */
66   gpointer *checkdata;
67
68   GMutex qlock;                 /* lock for queue (vs object lock) */
69   gboolean waiting_add;
70   GCond item_add;               /* signals buffers now available for reading */
71   gboolean waiting_del;
72   GCond item_del;               /* signals space now available for writing */
73   gboolean flushing;            /* indicates whether conditions where signalled because
74                                  * of external flushing */
75   GstDataQueueFullCallback fullcallback;
76   GstDataQueueEmptyCallback emptycallback;
77 };
78
79 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
80     GST_CAT_TRACE (data_queue_dataflow,                                 \
81       "locking qlock from thread %p",                                   \
82       g_thread_self ());                                                \
83   g_mutex_lock (&q->priv->qlock);                                       \
84   GST_CAT_TRACE (data_queue_dataflow,                                   \
85       "locked qlock from thread %p",                                    \
86       g_thread_self ());                                                \
87 } G_STMT_END
88
89 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START {        \
90     GST_DATA_QUEUE_MUTEX_LOCK (q);                                      \
91     if (q->priv->flushing)                                              \
92       goto label;                                                       \
93   } G_STMT_END
94
95 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
96     GST_CAT_TRACE (data_queue_dataflow,                                 \
97       "unlocking qlock from thread %p",                                 \
98       g_thread_self ());                                                \
99   g_mutex_unlock (&q->priv->qlock);                                     \
100 } G_STMT_END
101
102 #define STATUS(q, msg)                                                  \
103   GST_CAT_LOG (data_queue_dataflow,                                     \
104                "queue:%p " msg ": %u visible items, %u "                \
105                "bytes, %"G_GUINT64_FORMAT                               \
106                " ns, %u elements",                                      \
107                queue,                                                   \
108                q->priv->cur_level.visible,                              \
109                q->priv->cur_level.bytes,                                \
110                q->priv->cur_level.time,                                 \
111                gst_queue_array_get_length (q->priv->queue))
112
113 static void gst_data_queue_finalize (GObject * object);
114
115 static void gst_data_queue_set_property (GObject * object,
116     guint prop_id, const GValue * value, GParamSpec * pspec);
117 static void gst_data_queue_get_property (GObject * object,
118     guint prop_id, GValue * value, GParamSpec * pspec);
119
120 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
121
122 #define _do_init \
123 { \
124   GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \
125       "data queue object"); \
126   GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \
127       "dataflow inside the data queue object"); \
128 }
129
130 #define parent_class gst_data_queue_parent_class
131 G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT, _do_init);
132
133 static void
134 gst_data_queue_class_init (GstDataQueueClass * klass)
135 {
136   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
137
138   g_type_class_add_private (klass, sizeof (GstDataQueuePrivate));
139
140   gobject_class->set_property = gst_data_queue_set_property;
141   gobject_class->get_property = gst_data_queue_get_property;
142
143   /* signals */
144   /**
145    * GstDataQueue::empty:
146    * @queue: the queue instance
147    *
148    * Reports that the queue became empty (empty).
149    * A queue is empty if the total amount of visible items inside it (num-visible, time,
150    * size) is lower than the boundary values which can be set through the GObject
151    * properties.
152    */
153   gst_data_queue_signals[SIGNAL_EMPTY] =
154       g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
155       G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
156       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
157
158   /**
159    * GstDataQueue::full:
160    * @queue: the queue instance
161    *
162    * Reports that the queue became full (full).
163    * A queue is full if the total amount of data inside it (num-visible, time,
164    * size) is higher than the boundary values which can be set through the GObject
165    * properties.
166    */
167   gst_data_queue_signals[SIGNAL_FULL] =
168       g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
169       G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
170       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
171
172   /* properties */
173   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
174       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
175           "Current amount of data in the queue (bytes)",
176           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
177   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_VISIBLE,
178       g_param_spec_uint ("current-level-visible",
179           "Current level (visible items)",
180           "Current number of visible items in the queue", 0, G_MAXUINT, 0,
181           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
182   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
183       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
184           "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
185           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
186
187   gobject_class->finalize = gst_data_queue_finalize;
188 }
189
190 static void
191 gst_data_queue_init (GstDataQueue * queue)
192 {
193   queue->priv =
194       G_TYPE_INSTANCE_GET_PRIVATE (queue, GST_TYPE_DATA_QUEUE,
195       GstDataQueuePrivate);
196
197   queue->priv->cur_level.visible = 0;   /* no content */
198   queue->priv->cur_level.bytes = 0;     /* no content */
199   queue->priv->cur_level.time = 0;      /* no content */
200
201   queue->priv->checkfull = NULL;
202
203   g_mutex_init (&queue->priv->qlock);
204   g_cond_init (&queue->priv->item_add);
205   g_cond_init (&queue->priv->item_del);
206   queue->priv->queue = gst_queue_array_new (50);
207
208   GST_DEBUG ("initialized queue's not_empty & not_full conditions");
209 }
210
211 /**
212  * gst_data_queue_new:
213  * @checkfull: the callback used to tell if the element considers the queue full
214  * or not.
215  * @fullcallback: the callback which will be called when the queue is considered full.
216  * @emptycallback: the callback which will be called when the queue is considered empty.
217  * @checkdata: a #gpointer that will be given in the @checkfull callback.
218  *
219  * Creates a new #GstDataQueue. The difference with @gst_data_queue_new is that it will
220  * not emit the 'full' and 'empty' signals, but instead calling directly @fullcallback
221  * or @emptycallback.
222  *
223  * Returns: a new #GstDataQueue.
224  *
225  * Since: 1.2.0
226  */
227 GstDataQueue *
228 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
229     GstDataQueueFullCallback fullcallback,
230     GstDataQueueEmptyCallback emptycallback, gpointer checkdata)
231 {
232   GstDataQueue *ret;
233
234   g_return_val_if_fail (checkfull != NULL, NULL);
235
236   ret = g_object_newv (GST_TYPE_DATA_QUEUE, 0, NULL);
237   ret->priv->checkfull = checkfull;
238   ret->priv->checkdata = checkdata;
239   ret->priv->fullcallback = fullcallback;
240   ret->priv->emptycallback = emptycallback;
241
242   return ret;
243 }
244
245 static void
246 gst_data_queue_cleanup (GstDataQueue * queue)
247 {
248   GstDataQueuePrivate *priv = queue->priv;
249
250   while (!gst_queue_array_is_empty (priv->queue)) {
251     GstDataQueueItem *item = gst_queue_array_pop_head (priv->queue);
252
253     /* Just call the destroy notify on the item */
254     item->destroy (item);
255   }
256   priv->cur_level.visible = 0;
257   priv->cur_level.bytes = 0;
258   priv->cur_level.time = 0;
259 }
260
261 /* called only once, as opposed to dispose */
262 static void
263 gst_data_queue_finalize (GObject * object)
264 {
265   GstDataQueue *queue = GST_DATA_QUEUE (object);
266   GstDataQueuePrivate *priv = queue->priv;
267
268   GST_DEBUG ("finalizing queue");
269
270   gst_data_queue_cleanup (queue);
271   gst_queue_array_free (priv->queue);
272
273   GST_DEBUG ("free mutex");
274   g_mutex_clear (&priv->qlock);
275   GST_DEBUG ("done free mutex");
276
277   g_cond_clear (&priv->item_add);
278   g_cond_clear (&priv->item_del);
279
280   G_OBJECT_CLASS (parent_class)->finalize (object);
281 }
282
283 static inline void
284 gst_data_queue_locked_flush (GstDataQueue * queue)
285 {
286   GstDataQueuePrivate *priv = queue->priv;
287
288   STATUS (queue, "before flushing");
289   gst_data_queue_cleanup (queue);
290   STATUS (queue, "after flushing");
291   /* we deleted something... */
292   if (priv->waiting_del)
293     g_cond_signal (&priv->item_del);
294 }
295
296 static inline gboolean
297 gst_data_queue_locked_is_empty (GstDataQueue * queue)
298 {
299   GstDataQueuePrivate *priv = queue->priv;
300
301   return (gst_queue_array_get_length (priv->queue) == 0);
302 }
303
304 static inline gboolean
305 gst_data_queue_locked_is_full (GstDataQueue * queue)
306 {
307   GstDataQueuePrivate *priv = queue->priv;
308
309   return priv->checkfull (queue, priv->cur_level.visible,
310       priv->cur_level.bytes, priv->cur_level.time, priv->checkdata);
311 }
312
313 /**
314  * gst_data_queue_flush:
315  * @queue: a #GstDataQueue.
316  *
317  * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
318  * #gst_data_queue_pop will be released.
319  * MT safe.
320  *
321  * Since: 1.2.0
322  */
323 void
324 gst_data_queue_flush (GstDataQueue * queue)
325 {
326   GST_DEBUG ("queue:%p", queue);
327   GST_DATA_QUEUE_MUTEX_LOCK (queue);
328   gst_data_queue_locked_flush (queue);
329   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
330 }
331
332 /**
333  * gst_data_queue_is_empty:
334  * @queue: a #GstDataQueue.
335  *
336  * Queries if there are any items in the @queue.
337  * MT safe.
338  *
339  * Returns: #TRUE if @queue is empty.
340  *
341  * Since: 1.2.0
342  */
343 gboolean
344 gst_data_queue_is_empty (GstDataQueue * queue)
345 {
346   gboolean res;
347
348   GST_DATA_QUEUE_MUTEX_LOCK (queue);
349   res = gst_data_queue_locked_is_empty (queue);
350   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
351
352   return res;
353 }
354
355 /**
356  * gst_data_queue_is_full:
357  * @queue: a #GstDataQueue.
358  *
359  * Queries if @queue is full. This check will be done using the
360  * #GstDataQueueCheckFullFunction registered with @queue.
361  * MT safe.
362  *
363  * Returns: #TRUE if @queue is full.
364  *
365  * Since: 1.2.0
366  */
367 gboolean
368 gst_data_queue_is_full (GstDataQueue * queue)
369 {
370   gboolean res;
371
372   GST_DATA_QUEUE_MUTEX_LOCK (queue);
373   res = gst_data_queue_locked_is_full (queue);
374   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
375
376   return res;
377 }
378
379 /**
380  * gst_data_queue_set_flushing:
381  * @queue: a #GstDataQueue.
382  * @flushing: a #gboolean stating if the queue will be flushing or not.
383  *
384  * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
385  * state, any incoming data on the @queue will be discarded. Any call currently
386  * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
387  * away with a return value of #FALSE. While the @queue is in flushing state, 
388  * all calls to those two functions will return #FALSE.
389  *
390  * MT Safe.
391  *
392  * Since: 1.2.0
393  */
394 void
395 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
396 {
397   GstDataQueuePrivate *priv = queue->priv;
398
399   GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
400
401   GST_DATA_QUEUE_MUTEX_LOCK (queue);
402   priv->flushing = flushing;
403   if (flushing) {
404     /* release push/pop functions */
405     if (priv->waiting_add)
406       g_cond_signal (&priv->item_add);
407     if (priv->waiting_del)
408       g_cond_signal (&priv->item_del);
409   }
410   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
411 }
412
413 /**
414  * gst_data_queue_push:
415  * @queue: a #GstDataQueue.
416  * @item: a #GstDataQueueItem.
417  *
418  * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
419  * on the @queue. If the @queue is full, the call will block until space is
420  * available, OR the @queue is set to flushing state.
421  * MT safe.
422  *
423  * Note that this function has slightly different semantics than gst_pad_push()
424  * and gst_pad_push_event(): this function only takes ownership of @item and
425  * the #GstMiniObject contained in @item if the push was successful. If FALSE
426  * is returned, the caller is responsible for freeing @item and its contents.
427  *
428  * Returns: #TRUE if the @item was successfully pushed on the @queue.
429  *
430  * Since: 1.2.0
431  */
432 gboolean
433 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
434 {
435   GstDataQueuePrivate *priv = queue->priv;
436
437   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
438   g_return_val_if_fail (item != NULL, FALSE);
439
440   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
441
442   STATUS (queue, "before pushing");
443
444   /* We ALWAYS need to check for queue fillness */
445   if (gst_data_queue_locked_is_full (queue)) {
446     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
447     if (G_LIKELY (priv->fullcallback))
448       priv->fullcallback (queue, priv->checkdata);
449     else
450       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0);
451     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
452
453     /* signal might have removed some items */
454     while (gst_data_queue_locked_is_full (queue)) {
455       priv->waiting_del = TRUE;
456       g_cond_wait (&priv->item_del, &priv->qlock);
457       priv->waiting_del = FALSE;
458       if (priv->flushing)
459         goto flushing;
460     }
461   }
462
463   gst_queue_array_push_tail (priv->queue, item);
464
465   if (item->visible)
466     priv->cur_level.visible++;
467   priv->cur_level.bytes += item->size;
468   priv->cur_level.time += item->duration;
469
470   STATUS (queue, "after pushing");
471   if (priv->waiting_add)
472     g_cond_signal (&priv->item_add);
473
474   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
475
476   return TRUE;
477
478   /* ERRORS */
479 flushing:
480   {
481     GST_DEBUG ("queue:%p, we are flushing", queue);
482     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
483     return FALSE;
484   }
485 }
486
487 static gboolean
488 _gst_data_queue_wait_non_empty (GstDataQueue * queue)
489 {
490   GstDataQueuePrivate *priv = queue->priv;
491
492   while (gst_data_queue_locked_is_empty (queue)) {
493     priv->waiting_add = TRUE;
494     g_cond_wait (&priv->item_add, &priv->qlock);
495     priv->waiting_add = FALSE;
496     if (priv->flushing)
497       return FALSE;
498   }
499   return TRUE;
500 }
501
502 /**
503  * gst_data_queue_pop:
504  * @queue: a #GstDataQueue.
505  * @item: pointer to store the returned #GstDataQueueItem.
506  *
507  * Retrieves the first @item available on the @queue. If the queue is currently
508  * empty, the call will block until at least one item is available, OR the
509  * @queue is set to the flushing state.
510  * MT safe.
511  *
512  * Returns: #TRUE if an @item was successfully retrieved from the @queue.
513  *
514  * Since: 1.2.0
515  */
516 gboolean
517 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
518 {
519   GstDataQueuePrivate *priv = queue->priv;
520
521   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
522   g_return_val_if_fail (item != NULL, FALSE);
523
524   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
525
526   STATUS (queue, "before popping");
527
528   if (gst_data_queue_locked_is_empty (queue)) {
529     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
530     if (G_LIKELY (priv->emptycallback))
531       priv->emptycallback (queue, priv->checkdata);
532     else
533       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
534     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
535
536     if (!_gst_data_queue_wait_non_empty (queue))
537       goto flushing;
538   }
539
540   /* Get the item from the GQueue */
541   *item = gst_queue_array_pop_head (priv->queue);
542
543   /* update current level counter */
544   if ((*item)->visible)
545     priv->cur_level.visible--;
546   priv->cur_level.bytes -= (*item)->size;
547   priv->cur_level.time -= (*item)->duration;
548
549   STATUS (queue, "after popping");
550   if (priv->waiting_del)
551     g_cond_signal (&priv->item_del);
552
553   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
554
555   return TRUE;
556
557   /* ERRORS */
558 flushing:
559   {
560     GST_DEBUG ("queue:%p, we are flushing", queue);
561     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
562     return FALSE;
563   }
564 }
565
566 static gint
567 is_of_type (gconstpointer a, gconstpointer b)
568 {
569   return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_SIZE (b));
570 }
571
572 /**
573  * gst_data_queue_peek:
574  * @queue: a #GstDataQueue.
575  * @item: pointer to store the returned #GstDataQueueItem.
576  *
577  * Retrieves the first @item available on the @queue without removing it.
578  * If the queue is currently empty, the call will block until at least
579  * one item is available, OR the @queue is set to the flushing state.
580  * MT safe.
581  *
582  * Returns: #TRUE if an @item was successfully retrieved from the @queue.
583  *
584  * Since: 1.2.0
585  */
586 gboolean
587 gst_data_queue_peek (GstDataQueue * queue, GstDataQueueItem ** item)
588 {
589   GstDataQueuePrivate *priv = queue->priv;
590
591   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
592   g_return_val_if_fail (item != NULL, FALSE);
593
594   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
595
596   STATUS (queue, "before peeking");
597
598   if (gst_data_queue_locked_is_empty (queue)) {
599     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
600     if (G_LIKELY (priv->emptycallback))
601       priv->emptycallback (queue, priv->checkdata);
602     else
603       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
604     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
605
606     if (!_gst_data_queue_wait_non_empty (queue))
607       goto flushing;
608   }
609
610   /* Get the item from the GQueue */
611   *item = gst_queue_array_peek_head (priv->queue);
612
613   STATUS (queue, "after peeking");
614   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
615
616   return TRUE;
617
618   /* ERRORS */
619 flushing:
620   {
621     GST_DEBUG ("queue:%p, we are flushing", queue);
622     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
623     return FALSE;
624   }
625 }
626
627 /**
628  * gst_data_queue_drop_head:
629  * @queue: The #GstDataQueue to drop an item from.
630  * @type: The #GType of the item to drop.
631  *
632  * Pop and unref the head-most #GstMiniObject with the given #GType.
633  *
634  * Returns: TRUE if an element was removed.
635  *
636  * Since: 1.2.0
637  */
638 gboolean
639 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
640 {
641   gboolean res = FALSE;
642   GstDataQueueItem *leak = NULL;
643   guint idx;
644   GstDataQueuePrivate *priv = queue->priv;
645
646   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
647
648   GST_DEBUG ("queue:%p", queue);
649
650   GST_DATA_QUEUE_MUTEX_LOCK (queue);
651   idx = gst_queue_array_find (priv->queue, is_of_type, GSIZE_TO_POINTER (type));
652
653   if (idx == -1)
654     goto done;
655
656   leak = gst_queue_array_drop_element (priv->queue, idx);
657
658   if (leak->visible)
659     priv->cur_level.visible--;
660   priv->cur_level.bytes -= leak->size;
661   priv->cur_level.time -= leak->duration;
662
663   leak->destroy (leak);
664
665   res = TRUE;
666
667 done:
668   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
669
670   GST_DEBUG ("queue:%p , res:%d", queue, res);
671
672   return res;
673 }
674
675 /**
676  * gst_data_queue_limits_changed:
677  * @queue: The #GstDataQueue 
678  *
679  * Inform the queue that the limits for the fullness check have changed and that
680  * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
681  *
682  * Since: 1.2.0
683  */
684 void
685 gst_data_queue_limits_changed (GstDataQueue * queue)
686 {
687   GstDataQueuePrivate *priv = queue->priv;
688
689   g_return_if_fail (GST_IS_DATA_QUEUE (queue));
690
691   GST_DATA_QUEUE_MUTEX_LOCK (queue);
692   if (priv->waiting_del) {
693     GST_DEBUG ("signal del");
694     g_cond_signal (&priv->item_del);
695   }
696   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
697 }
698
699 /**
700  * gst_data_queue_get_level:
701  * @queue: The #GstDataQueue
702  * @level: the location to store the result
703  *
704  * Get the current level of the queue.
705  *
706  * Since: 1.2.0
707  */
708 void
709 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
710 {
711   GstDataQueuePrivate *priv = queue->priv;
712
713   memcpy (level, (&priv->cur_level), sizeof (GstDataQueueSize));
714 }
715
716 static void
717 gst_data_queue_set_property (GObject * object,
718     guint prop_id, const GValue * value, GParamSpec * pspec)
719 {
720   switch (prop_id) {
721     default:
722       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
723       break;
724   }
725 }
726
727 static void
728 gst_data_queue_get_property (GObject * object,
729     guint prop_id, GValue * value, GParamSpec * pspec)
730 {
731   GstDataQueue *queue = GST_DATA_QUEUE (object);
732   GstDataQueuePrivate *priv = queue->priv;
733
734   GST_DATA_QUEUE_MUTEX_LOCK (queue);
735
736   switch (prop_id) {
737     case PROP_CUR_LEVEL_BYTES:
738       g_value_set_uint (value, priv->cur_level.bytes);
739       break;
740     case PROP_CUR_LEVEL_VISIBLE:
741       g_value_set_uint (value, priv->cur_level.visible);
742       break;
743     case PROP_CUR_LEVEL_TIME:
744       g_value_set_uint64 (value, priv->cur_level.time);
745       break;
746     default:
747       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
748       break;
749   }
750
751   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
752 }