From 74797e962feb8ec9c3875c3766d107886d2a1b47 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 19 Aug 2019 18:19:50 +0300 Subject: [PATCH] aggregator: Always handle serialized events/queries directly before waiting Otherwise it can happen that we start waiting for another pad, while one pad already has events that can be handled and potentially also a buffer that can be handled. That buffer would then however not be accessible by the subclass from GstAggregator::get_next_time() as there would be the events in front of it, which doesn't allow the subclass then to calculate the next time based on already available buffers. As a side-effect this also allows removing the duplicated event handling code in the aggregate function as we'll always report pads as not ready when there is a serialized event or query at the top of at least one pad's queue. Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/issues/428 --- libs/gst/base/gstaggregator.c | 103 +++++++++++++++++++++++++++++------------- 1 file changed, 71 insertions(+), 32 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index e8282de..9eaaa90 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -398,8 +398,15 @@ gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) pad->priv->clipped_buffer == NULL); } +/* Will return FALSE if there's no buffer available on every non-EOS pad, or + * if at least one of the pads has an event or query at the top of its queue. + * + * Only returns TRUE if all non-EOS pads have a buffer available at the top of + * their queue or a clipped buffer already. + */ static gboolean -gst_aggregator_check_pads_ready (GstAggregator * self) +gst_aggregator_check_pads_ready (GstAggregator * self, + gboolean * have_event_or_query_ret) { GstAggregatorPad *pad = NULL; GList *l, *sinkpads; @@ -419,18 +426,30 @@ gst_aggregator_check_pads_ready (GstAggregator * self) PAD_LOCK (pad); - if (pad->priv->num_buffers == 0) { - if (!gst_aggregator_pad_queue_is_empty (pad)) - have_event_or_query = TRUE; - if (!pad->priv->eos) { - have_buffer = FALSE; + /* If there's an event or query at the top of the queue and we don't yet + * have taken the top buffer out and stored it as clip_buffer, remember + * that and exit the loop. We first have to handle all events/queries + * before we handle any buffers. */ + if (!pad->priv->clipped_buffer + && (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)) + || GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))) { + PAD_UNLOCK (pad); + have_event_or_query = TRUE; + break; + } - /* If not live we need data on all pads, so leave the loop */ - if (!self->priv->peer_latency_live) { - PAD_UNLOCK (pad); - goto pad_not_ready; - } - } + /* Otherwise check if we have a clipped buffer or a buffer at the top of + * the queue, and if not then this pad is not ready unless it is also EOS */ + if (!pad->priv->clipped_buffer + && !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + /* We must not have any buffers at all in this pad then as otherwise we + * would've had an event/query at the top of the queue */ + g_assert (pad->priv->num_buffers == 0); + + /* Only consider this pad as worth waiting for if it's not already EOS. + * There's no point in waiting for buffers on EOS pads */ + if (!pad->priv->eos) + have_buffer = FALSE; } else if (self->priv->peer_latency_live) { /* In live mode, having a single pad with buffers is enough to * generate a start time from it. In non-live mode all pads need @@ -442,7 +461,10 @@ gst_aggregator_check_pads_ready (GstAggregator * self) PAD_UNLOCK (pad); } - if (!have_buffer && !have_event_or_query) + if (have_event_or_query) + goto pad_not_ready_but_event_or_query; + + if (!have_buffer) goto pad_not_ready; if (have_buffer) @@ -450,23 +472,42 @@ gst_aggregator_check_pads_ready (GstAggregator * self) GST_OBJECT_UNLOCK (self); GST_LOG_OBJECT (self, "pads are ready"); + + if (have_event_or_query_ret) + *have_event_or_query_ret = have_event_or_query; + return TRUE; no_sinkpads: { GST_LOG_OBJECT (self, "pads not ready: no sink pads"); GST_OBJECT_UNLOCK (self); + + if (have_event_or_query_ret) + *have_event_or_query_ret = have_event_or_query; + return FALSE; } pad_not_ready: { - if (have_event_or_query) - GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet," - " but waking up for serialized event"); - else - GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); + GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); GST_OBJECT_UNLOCK (self); - return have_event_or_query; + + if (have_event_or_query_ret) + *have_event_or_query_ret = have_event_or_query; + + return FALSE; + } +pad_not_ready_but_event_or_query: + { + GST_LOG_OBJECT (pad, + "pad not ready to be aggregated yet, need to handle serialized event or query first"); + GST_OBJECT_UNLOCK (self); + + if (have_event_or_query_ret) + *have_event_or_query_ret = have_event_or_query; + + return FALSE; } } @@ -630,6 +671,7 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) GstClockTime latency; GstClockTime start; gboolean res; + gboolean have_event_or_query = FALSE; *timeout = FALSE; @@ -637,13 +679,21 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) latency = gst_aggregator_get_latency_unlocked (self); - if (gst_aggregator_check_pads_ready (self)) { + if (gst_aggregator_check_pads_ready (self, &have_event_or_query)) { GST_DEBUG_OBJECT (self, "all pads have data"); SRC_UNLOCK (self); return TRUE; } + /* If we have an event or query, immediately return FALSE instead of waiting + * and handle it immediately */ + if (have_event_or_query) { + GST_DEBUG_OBJECT (self, "Have serialized event or query to handle first"); + SRC_UNLOCK (self); + return FALSE; + } + /* Before waiting, check if we're actually still running */ if (!self->priv->running || !self->priv->send_eos) { SRC_UNLOCK (self); @@ -722,7 +772,7 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) } } - res = gst_aggregator_check_pads_ready (self); + res = gst_aggregator_check_pads_ready (self, NULL); SRC_UNLOCK (self); return res; @@ -1175,17 +1225,6 @@ gst_aggregator_aggregate_func (GstAggregator * self) if (!gst_aggregator_wait_and_check (self, &timeout)) continue; - events_query_data.processed_event = FALSE; - events_query_data.flow_ret = GST_FLOW_OK; - gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), - gst_aggregator_do_events_and_queries, &events_query_data); - - if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK) - goto handle_error; - - if (events_query_data.processed_event) - continue; - if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) { if (!gst_aggregator_negotiate_unlocked (self)) { gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self)); -- 2.7.4