aggregator: allow src GstAggregatorPads
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 9dddf83..89778a3 100644 (file)
@@ -22,8 +22,8 @@
 /**
  * SECTION: gstaggregator
  * @title: GstAggregator
- * @short_description: manages a set of pads with the purpose of
- * aggregating their buffers.
+ * @short_description: Base class for mixers and muxers, manages a set of input
+ *     pads and aggregates their streams
  * @see_also: gstcollectpads for historical reasons.
  *
  * Manages a set of pads with the purpose of aggregating their buffers.
@@ -41,9 +41,9 @@
  *  * When data is queued on all pads, the aggregate vmethod is called.
  *
  *  * One can peek at the data on any given GstAggregatorPad with the
- *    gst_aggregator_pad_get_buffer () method, and take ownership of it
- *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
- *    has been taken with steal_buffer (), a new buffer can be queued
+ *    gst_aggregator_pad_peek_buffer () method, and remove it from the pad
+ *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
+ *    has been taken with pop_buffer (), a new buffer can be queued
  *    on that pad.
  *
  *  * If the subclass wishes to push a buffer downstream in its aggregate
  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
  *    to ease their identification and subsequent processing.
  *
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
+ */
+
+/**
+ * SECTION: gstaggregatorpad
+ * @title: GstAggregatorPad
+ * @short_description: #GstPad subclass for pads managed by #GstAggregator
+ * @see_also: gstcollectpads for historical reasons.
+ *
+ * Pads managed by a #GstAggregor subclass.
+ *
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
  */
 
 #ifdef HAVE_CONFIG_H
@@ -100,14 +116,18 @@ gst_aggregator_start_time_selection_get_type (void)
 }
 
 /*  Might become API */
+#if 0
 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
     const GstTagList * tags, GstTagMergeMode mode);
+#endif
 static void gst_aggregator_set_latency_property (GstAggregator * agg,
     GstClockTime latency);
 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
 
 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
 
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
+
 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 #define GST_CAT_DEFAULT aggregator_debug
 
@@ -366,7 +386,7 @@ gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
 static gboolean
 gst_aggregator_check_pads_ready (GstAggregator * self)
 {
-  GstAggregatorPad *pad;
+  GstAggregatorPad *pad = NULL;
   GList *l, *sinkpads;
   gboolean have_buffer = TRUE;
   gboolean have_event_or_query = FALSE;
@@ -521,17 +541,8 @@ gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
   GST_PAD_STREAM_UNLOCK (self->srcpad);
 }
 
-/**
- * gst_aggregator_finish_buffer:
- * @self: The #GstAggregator
- * @buffer: (transfer full): the #GstBuffer to push.
- *
- * This method will push the provided output buffer downstream. If needed,
- * mandatory events such as stream-start, caps, and segment events will be
- * sent before pushing the buffer.
- */
-GstFlowReturn
-gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
+static GstFlowReturn
+gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
 {
   gst_aggregator_push_mandatory_events (self);
 
@@ -549,6 +560,25 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
   }
 }
 
+/**
+ * gst_aggregator_finish_buffer:
+ * @aggregator: The #GstAggregator
+ * @buffer: (transfer full): the #GstBuffer to push.
+ *
+ * This method will push the provided output buffer downstream. If needed,
+ * mandatory events such as stream-start, caps, and segment events will be
+ * sent before pushing the buffer.
+ */
+GstFlowReturn
+gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
+{
+  GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
+
+  g_assert (klass->finish_buffer != NULL);
+
+  return klass->finish_buffer (aggregator, buffer);
+}
+
 static void
 gst_aggregator_push_eos (GstAggregator * self)
 {
@@ -746,6 +776,42 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
   return TRUE;
 }
 
+static gboolean
+gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
+    gpointer user_data)
+{
+  GList *item;
+  GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
+  GstAggregator *agg = (GstAggregator *) self;
+  GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
+
+  if (!klass->skip_buffer)
+    return FALSE;
+
+  PAD_LOCK (aggpad);
+
+  item = g_queue_peek_head_link (&aggpad->priv->data);
+  while (item) {
+    GList *next = item->next;
+
+    if (GST_IS_BUFFER (item->data)
+        && klass->skip_buffer (aggpad, agg, item->data)) {
+      GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
+      gst_aggregator_pad_buffer_consumed (aggpad);
+      gst_buffer_unref (item->data);
+      g_queue_delete_link (&aggpad->priv->data, item);
+    } else {
+      break;
+    }
+
+    item = next;
+  }
+
+  PAD_UNLOCK (aggpad);
+
+  return TRUE;
+}
+
 static void
 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
     GstFlowReturn flow_return, gboolean full)
@@ -1028,6 +1094,10 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
         gst_aggregator_do_events_and_queries, NULL);
 
+    if (self->priv->peer_latency_live)
+      gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+          gst_aggregator_pad_skip_buffers, NULL);
+
     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
      * the queue */
     if (!gst_aggregator_wait_and_check (self, &timeout))
@@ -1412,19 +1482,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       goto eat;
     }
     case GST_EVENT_TAG:
-    {
-      GstTagList *tags;
-
-      gst_event_parse_tag (event, &tags);
-
-      if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
-        gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
-        gst_event_unref (event);
-        event = NULL;
-        goto eat;
-      }
-      break;
-    }
+      goto eat;
     default:
     {
       break;
@@ -2227,6 +2285,8 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
       GST_DEBUG_FG_MAGENTA, "GstAggregator");
 
+  klass->finish_buffer = gst_aggregator_default_finish_buffer;
+
   klass->sink_event = gst_aggregator_default_sink_event;
   klass->sink_query = gst_aggregator_default_sink_query;
 
@@ -2269,8 +2329,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           "Start time to use if start-time-selection=set", 0,
           G_MAXUINT64,
           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
 }
 
 static void
@@ -2411,8 +2469,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
   GstFlowReturn flow_return;
   GstClockTime buf_pts;
 
-  GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
-
   PAD_LOCK (aggpad);
   flow_return = aggpad->priv->flow_return;
   if (flow_return != GST_FLOW_OK)
@@ -2683,14 +2739,16 @@ gst_aggregator_pad_constructed (GObject * object)
 {
   GstPad *pad = GST_PAD (object);
 
-  gst_pad_set_chain_function (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
-  gst_pad_set_event_full_function_full (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
-  gst_pad_set_query_function (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
-  gst_pad_set_activatemode_function (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+  if (GST_PAD_IS_SINK (pad)) {
+    gst_pad_set_chain_function (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
+    gst_pad_set_event_full_function_full (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
+    gst_pad_set_query_function (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
+    gst_pad_set_activatemode_function (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+  }
 }
 
 static void
@@ -2799,7 +2857,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
 }
 
 /**
- * gst_aggregator_pad_steal_buffer:
+ * gst_aggregator_pad_pop_buffer:
  * @pad: the pad to get buffer from
  *
  * Steal the ref to the buffer currently queued in @pad.
@@ -2808,7 +2866,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
  *   queued. You should unref the buffer after usage.
  */
 GstBuffer *
-gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buffer;
 
@@ -2842,7 +2900,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buf;
 
-  buf = gst_aggregator_pad_steal_buffer (pad);
+  buf = gst_aggregator_pad_pop_buffer (pad);
 
   if (buf == NULL)
     return FALSE;
@@ -2852,7 +2910,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 }
 
 /**
- * gst_aggregator_pad_get_buffer:
+ * gst_aggregator_pad_peek_buffer:
  * @pad: the pad to get buffer from
  *
  * Returns: (transfer full): A reference to the buffer in @pad or
@@ -2860,7 +2918,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
  * usage.
  */
 GstBuffer *
-gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buffer;
 
@@ -2896,7 +2954,8 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
   return is_eos;
 }
 
-/**
+#if 0
+/*
  * gst_aggregator_merge_tags:
  * @self: a #GstAggregator
  * @tags: a #GstTagList to merge
@@ -2930,6 +2989,7 @@ gst_aggregator_merge_tags (GstAggregator * self,
   self->priv->tags_changed = TRUE;
   GST_OBJECT_UNLOCK (self);
 }
+#endif
 
 /**
  * gst_aggregator_set_latency: