From e73a173224d1ba4795ee1c6bacaee2b17d733e20 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Olivier=20Cr=C3=AAte?= Date: Wed, 1 Apr 2015 22:10:11 -0400 Subject: [PATCH] aggregator: Unify downstream flow return and flushing Also means that having a non-OK downstream flow return wakes up the chain functions. https://bugzilla.gnome.org/show_bug.cgi?id=747220 --- libs/gst/base/gstaggregator.c | 99 +++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 41 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index eb1a110..f97e24a 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -177,7 +177,7 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); struct _GstAggregatorPadPrivate { /* Following fields are protected by the PAD_LOCK */ - gboolean flushing; + GstFlowReturn flow_return; gboolean pending_flush_start; gboolean pending_flush_stop; gboolean pending_eos; @@ -201,7 +201,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) PAD_LOCK (aggpad); aggpad->priv->pending_eos = FALSE; aggpad->priv->eos = FALSE; - aggpad->priv->flushing = FALSE; + aggpad->priv->flow_return = GST_FLOW_OK; PAD_UNLOCK (aggpad); if (klass->flush) @@ -230,7 +230,6 @@ struct _GstAggregatorPrivate gboolean flush_seeking; gboolean pending_flush_start; gboolean send_eos; /* protected by srcpad stream lock */ - GstFlowReturn flow_return; GstCaps *srccaps; /* protected by the srcpad stream lock */ @@ -399,7 +398,6 @@ static void gst_aggregator_reset_flow_values (GstAggregator * self) { GST_OBJECT_LOCK (self); - self->priv->flow_return = GST_FLOW_FLUSHING; self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; gst_segment_init (&self->segment, GST_FORMAT_TIME); @@ -632,6 +630,20 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) } static void +gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, + GstFlowReturn flow_return) +{ + PAD_LOCK (aggpad); + if (flow_return == GST_FLOW_NOT_LINKED) + aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return); + else + aggpad->priv->flow_return = flow_return; + gst_buffer_replace (&aggpad->priv->buffer, NULL); + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); +} + +static void gst_aggregator_aggregate_func (GstAggregator * self) { GstAggregatorPrivate *priv = self->priv; @@ -655,10 +667,12 @@ gst_aggregator_aggregate_func (GstAggregator * self) flow_return = klass->aggregate (self, timeout); GST_OBJECT_LOCK (self); - if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) - priv->flow_return = GST_FLOW_OK; - else - priv->flow_return = flow_return; + if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) { + /* We don't want to set the pads to flushing, but we want to + * stop the thread, so just break here */ + GST_OBJECT_UNLOCK (self); + break; + } GST_OBJECT_UNLOCK (self); if (flow_return == GST_FLOW_EOS) { @@ -667,8 +681,18 @@ gst_aggregator_aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); - if (flow_return != GST_FLOW_OK) + if (flow_return != GST_FLOW_OK) { + GList *item; + + GST_OBJECT_LOCK (self); + for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + + gst_aggregator_pad_set_flushing (aggpad, flow_return); + } + GST_OBJECT_UNLOCK (self); break; + } } /* Pause the task here, the only ways to get here are: @@ -692,7 +716,6 @@ gst_aggregator_start (GstAggregator * self) self->priv->send_segment = TRUE; self->priv->send_eos = TRUE; self->priv->srccaps = NULL; - self->priv->flow_return = GST_FLOW_OK; klass = GST_AGGREGATOR_GET_CLASS (self); @@ -790,24 +813,13 @@ gst_aggregator_all_flush_stop_received_locked (GstAggregator * self) } static void -gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad) -{ - PAD_LOCK (aggpad); - aggpad->priv->flushing = TRUE; - gst_buffer_replace (&aggpad->priv->buffer, NULL); - PAD_BROADCAST_EVENT (aggpad); - PAD_UNLOCK (aggpad); -} - - -static void gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) { GstAggregatorPrivate *priv = self->priv; GstAggregatorPadPrivate *padpriv = aggpad->priv; - gst_aggregator_pad_set_flushing (aggpad); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); PAD_FLUSH_LOCK (aggpad); PAD_LOCK (aggpad); @@ -828,7 +840,6 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); gst_aggregator_stop_srcpad_task (self, event); - priv->flow_return = GST_FLOW_OK; GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking"); GST_PAD_STREAM_LOCK (self->srcpad); @@ -1065,7 +1076,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad) GST_INFO_OBJECT (pad, "Removing pad"); SRC_LOCK (self); - gst_aggregator_pad_set_flushing (aggpad); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); gst_element_remove_pad (element, pad); SRC_BROADCAST (self); @@ -1790,7 +1801,6 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) { GstBuffer *actual_buf = buffer; GstAggregator *self = GST_AGGREGATOR (object); - GstAggregatorPrivate *priv = self->priv; GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); GstFlowReturn flow_return; @@ -1800,13 +1810,18 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) PAD_FLUSH_LOCK (aggpad); PAD_LOCK (aggpad); + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) + goto flushing; + if (aggpad->priv->pending_eos == TRUE) goto eos; - while (aggpad->priv->buffer && !aggpad->priv->flushing) + while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) PAD_WAIT_EVENT (aggpad); - if (aggpad->priv->flushing) + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) goto flushing; PAD_UNLOCK (aggpad); @@ -1820,6 +1835,9 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) if (aggpad->priv->buffer) gst_buffer_unref (aggpad->priv->buffer); aggpad->priv->buffer = actual_buf; + + flow_return = aggpad->priv->flow_return; + PAD_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad); @@ -1828,10 +1846,6 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) GST_DEBUG_OBJECT (aggpad, "Done chaining"); - GST_OBJECT_LOCK (self); - flow_return = priv->flow_return; - GST_OBJECT_UNLOCK (self); - return flow_return; flushing: @@ -1839,9 +1853,10 @@ flushing: PAD_FLUSH_UNLOCK (aggpad); gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (aggpad, "We are flushing"); + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", + gst_flow_get_name (flow_return)); - return GST_FLOW_FLUSHING; + return flow_return; eos: PAD_UNLOCK (aggpad); @@ -1863,10 +1878,10 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, if (GST_QUERY_IS_SERIALIZED (query)) { PAD_LOCK (aggpad); - while (aggpad->priv->buffer && !aggpad->priv->flushing) + while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) PAD_WAIT_EVENT (aggpad); - if (aggpad->priv->flushing) + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; PAD_UNLOCK (aggpad); @@ -1876,8 +1891,9 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GST_AGGREGATOR_PAD (pad), query); flushing: + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", + gst_flow_get_name (aggpad->priv->flow_return)); PAD_UNLOCK (aggpad); - GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query"); return FALSE; } @@ -1893,10 +1909,10 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, PAD_LOCK (aggpad); - while (aggpad->priv->buffer && !aggpad->priv->flushing) + while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) PAD_WAIT_EVENT (aggpad); - if (aggpad->priv->flushing + if (aggpad->priv->flow_return != GST_FLOW_OK && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) goto flushing; @@ -1907,8 +1923,9 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GST_AGGREGATOR_PAD (pad), event); flushing: + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", + gst_flow_get_name (aggpad->priv->flow_return)); PAD_UNLOCK (aggpad); - GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event"); if (GST_EVENT_IS_STICKY (event)) gst_pad_store_sticky_event (pad, event); gst_event_unref (event); @@ -1922,10 +1939,10 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad, GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (active == FALSE) { - gst_aggregator_pad_set_flushing (aggpad); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); } else { PAD_LOCK (aggpad); - aggpad->priv->flushing = FALSE; + aggpad->priv->flow_return = GST_FLOW_OK; PAD_BROADCAST_EVENT (aggpad); PAD_UNLOCK (aggpad); } -- 2.7.4