aggregator: delegate buffer skipping to the aggregate thread
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 33bdc39..9d6bb7a 100644 (file)
@@ -22,8 +22,8 @@
 /**
  * SECTION: gstaggregator
  * @title: GstAggregator
- * @short_description: manages a set of pads with the purpose of
- * aggregating their buffers.
+ * @short_description: Base class for mixers and muxers, manages a set of input
+ *     pads and aggregates their streams
  * @see_also: gstcollectpads for historical reasons.
  *
  * Manages a set of pads with the purpose of aggregating their buffers.
@@ -41,9 +41,9 @@
  *  * When data is queued on all pads, the aggregate vmethod is called.
  *
  *  * One can peek at the data on any given GstAggregatorPad with the
- *    gst_aggregator_pad_get_buffer () method, and take ownership of it
- *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
- *    has been taken with steal_buffer (), a new buffer can be queued
+ *    gst_aggregator_pad_peek_buffer () method, and remove it from the pad
+ *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
+ *    has been taken with pop_buffer (), a new buffer can be queued
  *    on that pad.
  *
  *  * If the subclass wishes to push a buffer downstream in its aggregate
  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
  *    to ease their identification and subsequent processing.
  *
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
+ */
+
+/**
+ * SECTION: gstaggregatorpad
+ * @title: GstAggregatorPad
+ * @short_description: #GstPad subclass for pads managed by #GstAggregator
+ * @see_also: gstcollectpads for historical reasons.
+ *
+ * Pads managed by a #GstAggregor subclass.
+ *
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
  */
 
 #ifdef HAVE_CONFIG_H
@@ -100,14 +116,18 @@ gst_aggregator_start_time_selection_get_type (void)
 }
 
 /*  Might become API */
+#if 0
 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
     const GstTagList * tags, GstTagMergeMode mode);
+#endif
 static void gst_aggregator_set_latency_property (GstAggregator * agg,
     GstClockTime latency);
 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
 
 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
 
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
+
 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 #define GST_CAT_DEFAULT aggregator_debug
 
@@ -356,87 +376,6 @@ enum
 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
 
-/**
- * gst_aggregator_iterate_sinkpads:
- * @self: The #GstAggregator
- * @func: (scope call): The function to call.
- * @user_data: (closure): The data to pass to @func.
- *
- * Iterate the sinkpads of aggregator to call a function on them.
- *
- * This method guarantees that @func will be called only once for each
- * sink pad.
- *
- * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE
- */
-gboolean
-gst_aggregator_iterate_sinkpads (GstAggregator * self,
-    GstAggregatorPadForeachFunc func, gpointer user_data)
-{
-  gboolean result = FALSE;
-  GstIterator *iter;
-  gboolean done = FALSE;
-  GValue item = { 0, };
-  GList *seen_pads = NULL;
-
-  iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
-
-  if (!iter)
-    goto no_iter;
-
-  while (!done) {
-    switch (gst_iterator_next (iter, &item)) {
-      case GST_ITERATOR_OK:
-      {
-        GstAggregatorPad *pad;
-
-        pad = g_value_get_object (&item);
-
-        /* if already pushed, skip. FIXME, find something faster to tag pads */
-        if (pad == NULL || g_list_find (seen_pads, pad)) {
-          g_value_reset (&item);
-          break;
-        }
-
-        GST_LOG_OBJECT (pad, "calling function %s on pad",
-            GST_DEBUG_FUNCPTR_NAME (func));
-
-        result = func (self, pad, user_data);
-
-        done = !result;
-
-        seen_pads = g_list_prepend (seen_pads, pad);
-
-        g_value_reset (&item);
-        break;
-      }
-      case GST_ITERATOR_RESYNC:
-        gst_iterator_resync (iter);
-        break;
-      case GST_ITERATOR_ERROR:
-        GST_ERROR_OBJECT (self,
-            "Could not iterate over internally linked pads");
-        done = TRUE;
-        break;
-      case GST_ITERATOR_DONE:
-        done = TRUE;
-        break;
-    }
-  }
-  g_value_unset (&item);
-  gst_iterator_free (iter);
-
-  if (seen_pads == NULL) {
-    GST_DEBUG_OBJECT (self, "No pad seen");
-    return FALSE;
-  }
-
-  g_list_free (seen_pads);
-
-no_iter:
-  return result;
-}
-
 static gboolean
 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
 {
@@ -447,7 +386,7 @@ gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
 static gboolean
 gst_aggregator_check_pads_ready (GstAggregator * self)
 {
-  GstAggregatorPad *pad;
+  GstAggregatorPad *pad = NULL;
   GList *l, *sinkpads;
   gboolean have_buffer = TRUE;
   gboolean have_event_or_query = FALSE;
@@ -602,17 +541,8 @@ gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
   GST_PAD_STREAM_UNLOCK (self->srcpad);
 }
 
-/**
- * gst_aggregator_finish_buffer:
- * @self: The #GstAggregator
- * @buffer: (transfer full): the #GstBuffer to push.
- *
- * This method will push the provided output buffer downstream. If needed,
- * mandatory events such as stream-start, caps, and segment events will be
- * sent before pushing the buffer.
- */
-GstFlowReturn
-gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
+static GstFlowReturn
+gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
 {
   gst_aggregator_push_mandatory_events (self);
 
@@ -630,6 +560,25 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
   }
 }
 
+/**
+ * gst_aggregator_finish_buffer:
+ * @aggregator: The #GstAggregator
+ * @buffer: (transfer full): the #GstBuffer to push.
+ *
+ * This method will push the provided output buffer downstream. If needed,
+ * mandatory events such as stream-start, caps, and segment events will be
+ * sent before pushing the buffer.
+ */
+GstFlowReturn
+gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
+{
+  GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
+
+  g_assert (klass->finish_buffer != NULL);
+
+  return klass->finish_buffer (aggregator, buffer);
+}
+
 static void
 gst_aggregator_push_eos (GstAggregator * self)
 {
@@ -827,6 +776,42 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
   return TRUE;
 }
 
+static gboolean
+gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
+    gpointer user_data)
+{
+  GList *item;
+  GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
+  GstAggregator *agg = (GstAggregator *) self;
+  GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
+
+  if (!klass->skip_buffer)
+    return FALSE;
+
+  PAD_LOCK (aggpad);
+
+  item = g_queue_peek_head_link (&aggpad->priv->data);
+  while (item) {
+    GList *next = item->next;
+
+    if (GST_IS_BUFFER (item->data)
+        && klass->skip_buffer (aggpad, agg, item->data)) {
+      GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
+      gst_aggregator_pad_buffer_consumed (aggpad);
+      gst_buffer_unref (item->data);
+      g_queue_delete_link (&aggpad->priv->data, item);
+    } else {
+      break;
+    }
+
+    item = next;
+  }
+
+  PAD_UNLOCK (aggpad);
+
+  return TRUE;
+}
+
 static void
 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
     GstFlowReturn flow_return, gboolean full)
@@ -1109,6 +1094,10 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
         gst_aggregator_do_events_and_queries, NULL);
 
+    if (self->priv->peer_latency_live)
+      gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+          gst_aggregator_pad_skip_buffers, NULL);
+
     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
      * the queue */
     if (!gst_aggregator_wait_and_check (self, &timeout))
@@ -1493,19 +1482,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       goto eat;
     }
     case GST_EVENT_TAG:
-    {
-      GstTagList *tags;
-
-      gst_event_parse_tag (event, &tags);
-
-      if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
-        gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
-        gst_event_unref (event);
-        event = NULL;
-        goto eat;
-      }
-      break;
-    }
+      goto eat;
     default:
     {
       break;
@@ -1645,6 +1622,9 @@ gst_aggregator_default_create_new_pad (GstAggregator * self,
   GstAggregatorPrivate *priv = self->priv;
   gint serial = 0;
   gchar *name = NULL;
+  GType pad_type =
+      GST_PAD_TEMPLATE_GTYPE (templ) ==
+      G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
 
   if (templ->direction != GST_PAD_SINK)
     goto not_sink;
@@ -1665,7 +1645,7 @@ gst_aggregator_default_create_new_pad (GstAggregator * self,
   }
 
   name = g_strdup_printf ("sink_%u", serial);
-  agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
+  agg_pad = g_object_new (pad_type,
       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
   g_free (name);
 
@@ -2305,7 +2285,7 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
       GST_DEBUG_FG_MAGENTA, "GstAggregator");
 
-  klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
+  klass->finish_buffer = gst_aggregator_default_finish_buffer;
 
   klass->sink_event = gst_aggregator_default_sink_event;
   klass->sink_query = gst_aggregator_default_sink_query;
@@ -2349,8 +2329,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           "Start time to use if start-time-selection=set", 0,
           G_MAXUINT64,
           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
 }
 
 static void
@@ -2491,8 +2469,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
   GstFlowReturn flow_return;
   GstClockTime buf_pts;
 
-  GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
-
   PAD_LOCK (aggpad);
   flow_return = aggpad->priv->flow_return;
   if (flow_return != GST_FLOW_OK)
@@ -2879,7 +2855,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
 }
 
 /**
- * gst_aggregator_pad_steal_buffer:
+ * gst_aggregator_pad_pop_buffer:
  * @pad: the pad to get buffer from
  *
  * Steal the ref to the buffer currently queued in @pad.
@@ -2888,7 +2864,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
  *   queued. You should unref the buffer after usage.
  */
 GstBuffer *
-gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buffer;
 
@@ -2922,7 +2898,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buf;
 
-  buf = gst_aggregator_pad_steal_buffer (pad);
+  buf = gst_aggregator_pad_pop_buffer (pad);
 
   if (buf == NULL)
     return FALSE;
@@ -2932,7 +2908,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 }
 
 /**
- * gst_aggregator_pad_get_buffer:
+ * gst_aggregator_pad_peek_buffer:
  * @pad: the pad to get buffer from
  *
  * Returns: (transfer full): A reference to the buffer in @pad or
@@ -2940,7 +2916,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
  * usage.
  */
 GstBuffer *
-gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buffer;
 
@@ -2958,6 +2934,12 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
   return buffer;
 }
 
+/**
+ * gst_aggregator_pad_is_eos:
+ * @pad: an aggregator pad
+ *
+ * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
+ */
 gboolean
 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
 {
@@ -2970,7 +2952,8 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
   return is_eos;
 }
 
-/**
+#if 0
+/*
  * gst_aggregator_merge_tags:
  * @self: a #GstAggregator
  * @tags: a #GstTagList to merge
@@ -3004,6 +2987,7 @@ gst_aggregator_merge_tags (GstAggregator * self,
   self->priv->tags_changed = TRUE;
   GST_OBJECT_UNLOCK (self);
 }
+#endif
 
 /**
  * gst_aggregator_set_latency: