From 474acca0a833e152bf5be199d4b98643f510af56 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Olivier=20Cr=C3=AAte?= Date: Tue, 23 May 2017 00:53:57 +0200 Subject: [PATCH] aggregator: Process serialized queries through the queue This ensures that they really get processed in order with buffers. Just waiting for the queue to be empty is sometimes not enough as the buffers are dropped from the pad before the result is pushed to the next element, sometimes resulting in surprising re-ordering. --- libs/gst/base/gstaggregator.c | 80 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 14 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 2afa3c3..f0dd552 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -751,6 +751,7 @@ static gboolean check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) { GstEvent *event = NULL; + GstQuery *query = NULL; GstAggregatorClass *klass = NULL; gboolean *processed_event = user_data; @@ -763,11 +764,14 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) pad->priv->eos = TRUE; } if (pad->priv->clipped_buffer == NULL && - GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { - event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers)); + !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) { + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) + event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers)); + if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->buffers))) + query = g_queue_peek_tail (&pad->priv->buffers); } PAD_UNLOCK (pad); - if (event) { + if (event || query) { gboolean ret; if (processed_event) @@ -776,18 +780,43 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) klass = GST_AGGREGATOR_GET_CLASS (self); GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); - gst_event_ref (event); - ret = klass->sink_event (self, pad, event); - PAD_LOCK (pad); - if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) - pad->priv->negotiated = ret; - if (g_queue_peek_tail (&pad->priv->buffers) == event) - gst_event_unref (g_queue_pop_tail (&pad->priv->buffers)); - gst_event_unref (event); + if (event) { + gst_event_ref (event); + ret = klass->sink_event (self, pad, event); + + PAD_LOCK (pad); + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + pad->priv->negotiated = ret; + if (g_queue_peek_tail (&pad->priv->buffers) == event) + gst_event_unref (g_queue_pop_tail (&pad->priv->buffers)); + gst_event_unref (event); + } + + if (query) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + ret = klass->sink_query (self, pad, query); + + PAD_LOCK (pad); + if (g_queue_peek_tail (&pad->priv->buffers) == query) { + GstStructure *s; + + s = gst_query_writable_structure (query); + gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret, + NULL); + g_queue_pop_tail (&pad->priv->buffers); + } + } + PAD_BROADCAST_EVENT (pad); PAD_UNLOCK (pad); } + if (query) { + if (processed_event) + *processed_event = TRUE; + if (klass == NULL) + klass = GST_AGGREGATOR_GET_CLASS (self); + } } while (event != NULL); return TRUE; @@ -816,7 +845,8 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || !GST_EVENT_IS_STICKY (item->data)) { - gst_mini_object_unref (item->data); + if (!GST_IS_QUERY (item->data)) + gst_mini_object_unref (item->data); g_queue_delete_link (&aggpad->priv->buffers, item); } item = next; @@ -2603,31 +2633,53 @@ static gboolean gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_QUERY_IS_SERIALIZED (query)) { + GstStructure *s; + gboolean ret = FALSE; + + SRC_LOCK (self); PAD_LOCK (aggpad); + if (aggpad->priv->flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } + + g_queue_push_head (&aggpad->priv->buffers, query); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + while (!gst_aggregator_pad_queue_is_empty (aggpad) && aggpad->priv->flow_return == GST_FLOW_OK) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } + s = gst_query_writable_structure (query); + if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) + gst_structure_remove_field (s, "gst-aggregator-retval"); + else + g_queue_remove (&aggpad->priv->buffers, query); + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; PAD_UNLOCK (aggpad); + + return ret; } - return klass->sink_query (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), query); + return klass->sink_query (self, aggpad, query); flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", gst_flow_get_name (aggpad->priv->flow_return)); PAD_UNLOCK (aggpad); + return FALSE; } -- 2.7.4