aggregator: Assert if the sink/src pad type that is to be used is not a GstAggregator...
[platform/upstream/gstreamer.git] / libs / gst / base / gstdataqueue.c
index ebdc2c8..d6479bb 100644 (file)
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
  * SECTION:gstdataqueue
+ * @title: GstDataQueue
  * @short_description: Threadsafe queueing object
  *
  * #GstDataQueue is an object that handles threadsafe queueing of objects. It
  * also provides size-related functionality. This object should be used for
  * any #GstElement that wishes to provide some sort of queueing functionality.
  */
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
 
 #include <gst/gst.h>
 #include "string.h"
@@ -77,11 +81,11 @@ struct _GstDataQueuePrivate
 };
 
 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
-    GST_CAT_LOG (data_queue_dataflow,                                   \
+    GST_CAT_TRACE (data_queue_dataflow,                                 \
       "locking qlock from thread %p",                                   \
       g_thread_self ());                                                \
   g_mutex_lock (&q->priv->qlock);                                       \
-  GST_CAT_LOG (data_queue_dataflow,                                     \
+  GST_CAT_TRACE (data_queue_dataflow,                                   \
       "locked qlock from thread %p",                                    \
       g_thread_self ());                                                \
 } G_STMT_END
@@ -93,7 +97,7 @@ struct _GstDataQueuePrivate
   } G_STMT_END
 
 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
-    GST_CAT_LOG (data_queue_dataflow,                                   \
+    GST_CAT_TRACE (data_queue_dataflow,                                 \
       "unlocking qlock from thread %p",                                 \
       g_thread_self ());                                                \
   g_mutex_unlock (&q->priv->qlock);                                     \
@@ -128,21 +132,20 @@ static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
 }
 
 #define parent_class gst_data_queue_parent_class
-G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT, _do_init);
+G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT,
+    G_ADD_PRIVATE (GstDataQueue) _do_init);
 
 static void
 gst_data_queue_class_init (GstDataQueueClass * klass)
 {
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
 
-  g_type_class_add_private (klass, sizeof (GstDataQueuePrivate));
-
   gobject_class->set_property = gst_data_queue_set_property;
   gobject_class->get_property = gst_data_queue_get_property;
 
   /* signals */
   /**
-   * GstDataQueue::empty:
+   * GstDataQueue::empty: (skip)
    * @queue: the queue instance
    *
    * Reports that the queue became empty (empty).
@@ -156,7 +159,7 @@ gst_data_queue_class_init (GstDataQueueClass * klass)
       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
 
   /**
-   * GstDataQueue::full:
+   * GstDataQueue::full: (skip)
    * @queue: the queue instance
    *
    * Reports that the queue became full (full).
@@ -190,9 +193,7 @@ gst_data_queue_class_init (GstDataQueueClass * klass)
 static void
 gst_data_queue_init (GstDataQueue * queue)
 {
-  queue->priv =
-      G_TYPE_INSTANCE_GET_PRIVATE (queue, GST_TYPE_DATA_QUEUE,
-      GstDataQueuePrivate);
+  queue->priv = gst_data_queue_get_instance_private (queue);
 
   queue->priv->cur_level.visible = 0;   /* no content */
   queue->priv->cur_level.bytes = 0;     /* no content */
@@ -209,20 +210,22 @@ gst_data_queue_init (GstDataQueue * queue)
 }
 
 /**
- * gst_data_queue_new:
+ * gst_data_queue_new: (skip)
  * @checkfull: the callback used to tell if the element considers the queue full
  * or not.
  * @fullcallback: the callback which will be called when the queue is considered full.
  * @emptycallback: the callback which will be called when the queue is considered empty.
- * @checkdata: a #gpointer that will be given in the @checkfull callback.
+ * @checkdata: a #gpointer that will be passed to the @checkfull, @fullcallback,
+ *   and @emptycallback callbacks.
  *
- * Creates a new #GstDataQueue. The difference with @gst_data_queue_new is that it will
- * not emit the 'full' and 'empty' signals, but instead calling directly @fullcallback
- * or @emptycallback.
+ * Creates a new #GstDataQueue. If @fullcallback or @emptycallback are supplied, then
+ * the #GstDataQueue will call the respective callback to signal full or empty condition.
+ * If the callbacks are NULL the #GstDataQueue will instead emit 'full' and 'empty'
+ * signals.
  *
  * Returns: a new #GstDataQueue.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 GstDataQueue *
 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
@@ -233,7 +236,7 @@ gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
 
   g_return_val_if_fail (checkfull != NULL, NULL);
 
-  ret = g_object_newv (GST_TYPE_DATA_QUEUE, 0, NULL);
+  ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
   ret->priv->checkfull = checkfull;
   ret->priv->checkdata = checkdata;
   ret->priv->fullcallback = fullcallback;
@@ -311,14 +314,14 @@ gst_data_queue_locked_is_full (GstDataQueue * queue)
 }
 
 /**
- * gst_data_queue_flush:
+ * gst_data_queue_flush: (skip)
  * @queue: a #GstDataQueue.
  *
  * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
  * #gst_data_queue_pop will be released.
  * MT safe.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 void
 gst_data_queue_flush (GstDataQueue * queue)
@@ -330,15 +333,15 @@ gst_data_queue_flush (GstDataQueue * queue)
 }
 
 /**
- * gst_data_queue_is_empty:
+ * gst_data_queue_is_empty: (skip)
  * @queue: a #GstDataQueue.
  *
  * Queries if there are any items in the @queue.
  * MT safe.
  *
- * Returns: #TRUE if @queue is empty.
+ * Returns: %TRUE if @queue is empty.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 gboolean
 gst_data_queue_is_empty (GstDataQueue * queue)
@@ -353,16 +356,16 @@ gst_data_queue_is_empty (GstDataQueue * queue)
 }
 
 /**
- * gst_data_queue_is_full:
+ * gst_data_queue_is_full: (skip)
  * @queue: a #GstDataQueue.
  *
  * Queries if @queue is full. This check will be done using the
  * #GstDataQueueCheckFullFunction registered with @queue.
  * MT safe.
  *
- * Returns: #TRUE if @queue is full.
+ * Returns: %TRUE if @queue is full.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 gboolean
 gst_data_queue_is_full (GstDataQueue * queue)
@@ -377,19 +380,19 @@ gst_data_queue_is_full (GstDataQueue * queue)
 }
 
 /**
- * gst_data_queue_set_flushing:
+ * gst_data_queue_set_flushing: (skip)
  * @queue: a #GstDataQueue.
  * @flushing: a #gboolean stating if the queue will be flushing or not.
  *
- * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
+ * Sets the queue to flushing state if @flushing is %TRUE. If set to flushing
  * state, any incoming data on the @queue will be discarded. Any call currently
  * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
- * away with a return value of #FALSE. While the @queue is in flushing state, 
- * all calls to those two functions will return #FALSE.
+ * away with a return value of %FALSE. While the @queue is in flushing state,
+ * all calls to those two functions will return %FALSE.
  *
  * MT Safe.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 void
 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
@@ -410,8 +413,70 @@ gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
 }
 
+static void
+gst_data_queue_push_force_unlocked (GstDataQueue * queue,
+    GstDataQueueItem * item)
+{
+  GstDataQueuePrivate *priv = queue->priv;
+
+  gst_queue_array_push_tail (priv->queue, item);
+
+  if (item->visible)
+    priv->cur_level.visible++;
+  priv->cur_level.bytes += item->size;
+  priv->cur_level.time += item->duration;
+}
+
 /**
- * gst_data_queue_push:
+ * gst_data_queue_push_force: (skip)
+ * @queue: a #GstDataQueue.
+ * @item: a #GstDataQueueItem.
+ *
+ * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
+ * on the @queue. It ignores if the @queue is full or not and forces the @item
+ * to be pushed anyway.
+ * MT safe.
+ *
+ * Note that this function has slightly different semantics than gst_pad_push()
+ * and gst_pad_push_event(): this function only takes ownership of @item and
+ * the #GstMiniObject contained in @item if the push was successful. If %FALSE
+ * is returned, the caller is responsible for freeing @item and its contents.
+ *
+ * Returns: %TRUE if the @item was successfully pushed on the @queue.
+ *
+ * Since: 1.2
+ */
+gboolean
+gst_data_queue_push_force (GstDataQueue * queue, GstDataQueueItem * item)
+{
+  GstDataQueuePrivate *priv = queue->priv;
+
+  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
+  g_return_val_if_fail (item != NULL, FALSE);
+
+  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
+
+  STATUS (queue, "before pushing");
+  gst_data_queue_push_force_unlocked (queue, item);
+  STATUS (queue, "after pushing");
+  if (priv->waiting_add)
+    g_cond_signal (&priv->item_add);
+
+  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+
+  return TRUE;
+
+  /* ERRORS */
+flushing:
+  {
+    GST_DEBUG ("queue:%p, we are flushing", queue);
+    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+    return FALSE;
+  }
+}
+
+/**
+ * gst_data_queue_push: (skip)
  * @queue: a #GstDataQueue.
  * @item: a #GstDataQueueItem.
  *
@@ -422,12 +487,12 @@ gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
  *
  * Note that this function has slightly different semantics than gst_pad_push()
  * and gst_pad_push_event(): this function only takes ownership of @item and
- * the #GstMiniObject contained in @item if the push was successful. If FALSE
+ * the #GstMiniObject contained in @item if the push was successful. If %FALSE
  * is returned, the caller is responsible for freeing @item and its contents.
  *
- * Returns: #TRUE if the @item was successfully pushed on the @queue.
+ * Returns: %TRUE if the @item was successfully pushed on the @queue.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 gboolean
 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
@@ -460,12 +525,7 @@ gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
     }
   }
 
-  gst_queue_array_push_tail (priv->queue, item);
-
-  if (item->visible)
-    priv->cur_level.visible++;
-  priv->cur_level.bytes += item->size;
-  priv->cur_level.time += item->duration;
+  gst_data_queue_push_force_unlocked (queue, item);
 
   STATUS (queue, "after pushing");
   if (priv->waiting_add)
@@ -484,19 +544,34 @@ flushing:
   }
 }
 
+static gboolean
+_gst_data_queue_wait_non_empty (GstDataQueue * queue)
+{
+  GstDataQueuePrivate *priv = queue->priv;
+
+  while (gst_data_queue_locked_is_empty (queue)) {
+    priv->waiting_add = TRUE;
+    g_cond_wait (&priv->item_add, &priv->qlock);
+    priv->waiting_add = FALSE;
+    if (priv->flushing)
+      return FALSE;
+  }
+  return TRUE;
+}
+
 /**
- * gst_data_queue_pop:
+ * gst_data_queue_pop: (skip)
  * @queue: a #GstDataQueue.
- * @item: pointer to store the returned #GstDataQueueItem.
+ * @item: (out): pointer to store the returned #GstDataQueueItem.
  *
  * Retrieves the first @item available on the @queue. If the queue is currently
  * empty, the call will block until at least one item is available, OR the
  * @queue is set to the flushing state.
  * MT safe.
  *
- * Returns: #TRUE if an @item was successfully retrieved from the @queue.
+ * Returns: %TRUE if an @item was successfully retrieved from the @queue.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 gboolean
 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
@@ -518,13 +593,8 @@ gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
 
-    while (gst_data_queue_locked_is_empty (queue)) {
-      priv->waiting_add = TRUE;
-      g_cond_wait (&priv->item_add, &priv->qlock);
-      priv->waiting_add = FALSE;
-      if (priv->flushing)
-        goto flushing;
-    }
+    if (!_gst_data_queue_wait_non_empty (queue))
+      goto flushing;
   }
 
   /* Get the item from the GQueue */
@@ -556,19 +626,74 @@ flushing:
 static gint
 is_of_type (gconstpointer a, gconstpointer b)
 {
-  return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_INT (b));
+  return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_SIZE (b));
+}
+
+/**
+ * gst_data_queue_peek: (skip)
+ * @queue: a #GstDataQueue.
+ * @item: (out): pointer to store the returned #GstDataQueueItem.
+ *
+ * Retrieves the first @item available on the @queue without removing it.
+ * If the queue is currently empty, the call will block until at least
+ * one item is available, OR the @queue is set to the flushing state.
+ * MT safe.
+ *
+ * Returns: %TRUE if an @item was successfully retrieved from the @queue.
+ *
+ * Since: 1.2
+ */
+gboolean
+gst_data_queue_peek (GstDataQueue * queue, GstDataQueueItem ** item)
+{
+  GstDataQueuePrivate *priv = queue->priv;
+
+  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
+  g_return_val_if_fail (item != NULL, FALSE);
+
+  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
+
+  STATUS (queue, "before peeking");
+
+  if (gst_data_queue_locked_is_empty (queue)) {
+    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+    if (G_LIKELY (priv->emptycallback))
+      priv->emptycallback (queue, priv->checkdata);
+    else
+      g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
+    GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
+
+    if (!_gst_data_queue_wait_non_empty (queue))
+      goto flushing;
+  }
+
+  /* Get the item from the GQueue */
+  *item = gst_queue_array_peek_head (priv->queue);
+
+  STATUS (queue, "after peeking");
+  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+
+  return TRUE;
+
+  /* ERRORS */
+flushing:
+  {
+    GST_DEBUG ("queue:%p, we are flushing", queue);
+    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+    return FALSE;
+  }
 }
 
 /**
- * gst_data_queue_drop_head:
+ * gst_data_queue_drop_head: (skip)
  * @queue: The #GstDataQueue to drop an item from.
  * @type: The #GType of the item to drop.
  *
  * Pop and unref the head-most #GstMiniObject with the given #GType.
  *
- * Returns: TRUE if an element was removed.
+ * Returns: %TRUE if an element was removed.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 gboolean
 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
@@ -583,7 +708,7 @@ gst_data_queue_drop_head (GstDataQueue * queue, GType type)
   GST_DEBUG ("queue:%p", queue);
 
   GST_DATA_QUEUE_MUTEX_LOCK (queue);
-  idx = gst_queue_array_find (priv->queue, is_of_type, GINT_TO_POINTER (type));
+  idx = gst_queue_array_find (priv->queue, is_of_type, GSIZE_TO_POINTER (type));
 
   if (idx == -1)
     goto done;
@@ -608,13 +733,13 @@ done:
 }
 
 /**
- * gst_data_queue_limits_changed:
- * @queue: The #GstDataQueue 
+ * gst_data_queue_limits_changed: (skip)
+ * @queue: The #GstDataQueue
  *
  * Inform the queue that the limits for the fullness check have changed and that
- * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
+ * any blocking gst_data_queue_push() should be unblocked to recheck the limits.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 void
 gst_data_queue_limits_changed (GstDataQueue * queue)
@@ -632,13 +757,13 @@ gst_data_queue_limits_changed (GstDataQueue * queue)
 }
 
 /**
- * gst_data_queue_get_level:
+ * gst_data_queue_get_level: (skip)
  * @queue: The #GstDataQueue
- * @level: the location to store the result
+ * @level: (out): the location to store the result
  *
  * Get the current level of the queue.
  *
- * Since: 1.2.0
+ * Since: 1.2
  */
 void
 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)