aggregator: delegate buffer skipping to the aggregate thread
authorMathieu Duponchelle <mathieu@centricular.com>
Tue, 23 Jan 2018 21:49:52 +0000 (22:49 +0100)
committerMathieu Duponchelle <mathieu@centricular.com>
Tue, 23 Jan 2018 21:49:52 +0000 (22:49 +0100)
As we do that for serialized events as well, and the subclass will
most likely need to access pad->segment to make its decisions,
doing that from the sinkpad's streaming threads was racy.

libs/gst/base/gstaggregator.c

index 794657a..9d6bb7a 100644 (file)
@@ -126,6 +126,8 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
 
 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
 
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
+
 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 #define GST_CAT_DEFAULT aggregator_debug
 
@@ -774,6 +776,42 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
   return TRUE;
 }
 
+static gboolean
+gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
+    gpointer user_data)
+{
+  GList *item;
+  GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
+  GstAggregator *agg = (GstAggregator *) self;
+  GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
+
+  if (!klass->skip_buffer)
+    return FALSE;
+
+  PAD_LOCK (aggpad);
+
+  item = g_queue_peek_head_link (&aggpad->priv->data);
+  while (item) {
+    GList *next = item->next;
+
+    if (GST_IS_BUFFER (item->data)
+        && klass->skip_buffer (aggpad, agg, item->data)) {
+      GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
+      gst_aggregator_pad_buffer_consumed (aggpad);
+      gst_buffer_unref (item->data);
+      g_queue_delete_link (&aggpad->priv->data, item);
+    } else {
+      break;
+    }
+
+    item = next;
+  }
+
+  PAD_UNLOCK (aggpad);
+
+  return TRUE;
+}
+
 static void
 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
     GstFlowReturn flow_return, gboolean full)
@@ -1056,6 +1094,10 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
         gst_aggregator_do_events_and_queries, NULL);
 
+    if (self->priv->peer_latency_live)
+      gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+          gst_aggregator_pad_skip_buffers, NULL);
+
     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
      * the queue */
     if (!gst_aggregator_wait_and_check (self, &timeout))
@@ -2426,18 +2468,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
 {
   GstFlowReturn flow_return;
   GstClockTime buf_pts;
-  GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
-
-  GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
 
   PAD_LOCK (aggpad);
   flow_return = aggpad->priv->flow_return;
   if (flow_return != GST_FLOW_OK)
     goto flushing;
 
-  if (klass->skip_buffer && klass->skip_buffer (aggpad, self, buffer))
-    goto skipped;
-
   PAD_UNLOCK (aggpad);
 
   buf_pts = GST_BUFFER_PTS (buffer);
@@ -2541,14 +2577,6 @@ flushing:
     gst_buffer_unref (buffer);
 
   return flow_return;
-
-skipped:
-  PAD_UNLOCK (aggpad);
-
-  GST_DEBUG_OBJECT (aggpad, "Skipped buffer %" GST_PTR_FORMAT, buffer);
-  gst_buffer_unref (buffer);
-
-  return GST_FLOW_OK;
 }
 
 static GstFlowReturn