From 117200faebd922c6b994fac96f0bf6ba1d9a1881 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 23 Jan 2018 22:49:52 +0100 Subject: [PATCH] aggregator: delegate buffer skipping to the aggregate thread As we do that for serialized events as well, and the subclass will most likely need to access pad->segment to make its decisions, doing that from the sinkpad's streaming threads was racy. --- libs/gst/base/gstaggregator.c | 56 ++++++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 794657a..9d6bb7a 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -126,6 +126,8 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); +static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad); + GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -774,6 +776,42 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, return TRUE; } +static gboolean +gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, + gpointer user_data) +{ + GList *item; + GstAggregatorPad *aggpad = (GstAggregatorPad *) epad; + GstAggregator *agg = (GstAggregator *) self; + GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); + + if (!klass->skip_buffer) + return FALSE; + + PAD_LOCK (aggpad); + + item = g_queue_peek_head_link (&aggpad->priv->data); + while (item) { + GList *next = item->next; + + if (GST_IS_BUFFER (item->data) + && klass->skip_buffer (aggpad, agg, item->data)) { + GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data); + gst_aggregator_pad_buffer_consumed (aggpad); + gst_buffer_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); + } else { + break; + } + + item = next; + } + + PAD_UNLOCK (aggpad); + + return TRUE; +} + static void gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, GstFlowReturn flow_return, gboolean full) @@ -1056,6 +1094,10 @@ gst_aggregator_aggregate_func (GstAggregator * self) gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), gst_aggregator_do_events_and_queries, NULL); + if (self->priv->peer_latency_live) + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_pad_skip_buffers, NULL); + /* Ensure we have buffers ready (either in clipped_buffer or at the head of * the queue */ if (!gst_aggregator_wait_and_check (self, &timeout)) @@ -2426,18 +2468,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, { GstFlowReturn flow_return; GstClockTime buf_pts; - GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); - - GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); PAD_LOCK (aggpad); flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) goto flushing; - if (klass->skip_buffer && klass->skip_buffer (aggpad, self, buffer)) - goto skipped; - PAD_UNLOCK (aggpad); buf_pts = GST_BUFFER_PTS (buffer); @@ -2541,14 +2577,6 @@ flushing: gst_buffer_unref (buffer); return flow_return; - -skipped: - PAD_UNLOCK (aggpad); - - GST_DEBUG_OBJECT (aggpad, "Skipped buffer %" GST_PTR_FORMAT, buffer); - gst_buffer_unref (buffer); - - return GST_FLOW_OK; } static GstFlowReturn -- 2.7.4