aggregator: allow src GstAggregatorPads
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 06c7cbb..89778a3 100644 (file)
@@ -41,9 +41,9 @@
  *  * When data is queued on all pads, the aggregate vmethod is called.
  *
  *  * One can peek at the data on any given GstAggregatorPad with the
- *    gst_aggregator_pad_get_buffer () method, and take ownership of it
- *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
- *    has been taken with steal_buffer (), a new buffer can be queued
+ *    gst_aggregator_pad_peek_buffer () method, and remove it from the pad
+ *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
+ *    has been taken with pop_buffer (), a new buffer can be queued
  *    on that pad.
  *
  *  * If the subclass wishes to push a buffer downstream in its aggregate
@@ -126,6 +126,8 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
 
 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
 
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
+
 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 #define GST_CAT_DEFAULT aggregator_debug
 
@@ -384,7 +386,7 @@ gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
 static gboolean
 gst_aggregator_check_pads_ready (GstAggregator * self)
 {
-  GstAggregatorPad *pad;
+  GstAggregatorPad *pad = NULL;
   GList *l, *sinkpads;
   gboolean have_buffer = TRUE;
   gboolean have_event_or_query = FALSE;
@@ -774,6 +776,42 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
   return TRUE;
 }
 
+static gboolean
+gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
+    gpointer user_data)
+{
+  GList *item;
+  GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
+  GstAggregator *agg = (GstAggregator *) self;
+  GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
+
+  if (!klass->skip_buffer)
+    return FALSE;
+
+  PAD_LOCK (aggpad);
+
+  item = g_queue_peek_head_link (&aggpad->priv->data);
+  while (item) {
+    GList *next = item->next;
+
+    if (GST_IS_BUFFER (item->data)
+        && klass->skip_buffer (aggpad, agg, item->data)) {
+      GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
+      gst_aggregator_pad_buffer_consumed (aggpad);
+      gst_buffer_unref (item->data);
+      g_queue_delete_link (&aggpad->priv->data, item);
+    } else {
+      break;
+    }
+
+    item = next;
+  }
+
+  PAD_UNLOCK (aggpad);
+
+  return TRUE;
+}
+
 static void
 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
     GstFlowReturn flow_return, gboolean full)
@@ -1056,6 +1094,10 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
         gst_aggregator_do_events_and_queries, NULL);
 
+    if (self->priv->peer_latency_live)
+      gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+          gst_aggregator_pad_skip_buffers, NULL);
+
     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
      * the queue */
     if (!gst_aggregator_wait_and_check (self, &timeout))
@@ -2287,8 +2329,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           "Start time to use if start-time-selection=set", 0,
           G_MAXUINT64,
           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
 }
 
 static void
@@ -2429,8 +2469,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
   GstFlowReturn flow_return;
   GstClockTime buf_pts;
 
-  GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
-
   PAD_LOCK (aggpad);
   flow_return = aggpad->priv->flow_return;
   if (flow_return != GST_FLOW_OK)
@@ -2701,14 +2739,16 @@ gst_aggregator_pad_constructed (GObject * object)
 {
   GstPad *pad = GST_PAD (object);
 
-  gst_pad_set_chain_function (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
-  gst_pad_set_event_full_function_full (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
-  gst_pad_set_query_function (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
-  gst_pad_set_activatemode_function (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+  if (GST_PAD_IS_SINK (pad)) {
+    gst_pad_set_chain_function (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
+    gst_pad_set_event_full_function_full (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
+    gst_pad_set_query_function (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
+    gst_pad_set_activatemode_function (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+  }
 }
 
 static void
@@ -2817,7 +2857,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
 }
 
 /**
- * gst_aggregator_pad_steal_buffer:
+ * gst_aggregator_pad_pop_buffer:
  * @pad: the pad to get buffer from
  *
  * Steal the ref to the buffer currently queued in @pad.
@@ -2826,7 +2866,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
  *   queued. You should unref the buffer after usage.
  */
 GstBuffer *
-gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buffer;
 
@@ -2860,7 +2900,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buf;
 
-  buf = gst_aggregator_pad_steal_buffer (pad);
+  buf = gst_aggregator_pad_pop_buffer (pad);
 
   if (buf == NULL)
     return FALSE;
@@ -2870,7 +2910,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 }
 
 /**
- * gst_aggregator_pad_get_buffer:
+ * gst_aggregator_pad_peek_buffer:
  * @pad: the pad to get buffer from
  *
  * Returns: (transfer full): A reference to the buffer in @pad or
@@ -2878,7 +2918,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
  * usage.
  */
 GstBuffer *
-gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buffer;