aggregator: Unify downstream flow return and flushing
authorOlivier Crête <olivier.crete@collabora.com>
Thu, 2 Apr 2015 02:10:11 +0000 (22:10 -0400)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
Also means that having a non-OK downstream flow return
wakes up the chain functions.

https://bugzilla.gnome.org/show_bug.cgi?id=747220

libs/gst/base/gstaggregator.c

index eb1a110..f97e24a 100644 (file)
@@ -177,7 +177,7 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 struct _GstAggregatorPadPrivate
 {
   /* Following fields are protected by the PAD_LOCK */
-  gboolean flushing;
+  GstFlowReturn flow_return;
   gboolean pending_flush_start;
   gboolean pending_flush_stop;
   gboolean pending_eos;
@@ -201,7 +201,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
   PAD_LOCK (aggpad);
   aggpad->priv->pending_eos = FALSE;
   aggpad->priv->eos = FALSE;
-  aggpad->priv->flushing = FALSE;
+  aggpad->priv->flow_return = GST_FLOW_OK;
   PAD_UNLOCK (aggpad);
 
   if (klass->flush)
@@ -230,7 +230,6 @@ struct _GstAggregatorPrivate
   gboolean flush_seeking;
   gboolean pending_flush_start;
   gboolean send_eos;            /* protected by srcpad stream lock */
-  GstFlowReturn flow_return;
 
   GstCaps *srccaps;             /* protected by the srcpad stream lock */
 
@@ -399,7 +398,6 @@ static void
 gst_aggregator_reset_flow_values (GstAggregator * self)
 {
   GST_OBJECT_LOCK (self);
-  self->priv->flow_return = GST_FLOW_FLUSHING;
   self->priv->send_stream_start = TRUE;
   self->priv->send_segment = TRUE;
   gst_segment_init (&self->segment, GST_FORMAT_TIME);
@@ -632,6 +630,20 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
 }
 
 static void
+gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
+    GstFlowReturn flow_return)
+{
+  PAD_LOCK (aggpad);
+  if (flow_return == GST_FLOW_NOT_LINKED)
+    aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
+  else
+    aggpad->priv->flow_return = flow_return;
+  gst_buffer_replace (&aggpad->priv->buffer, NULL);
+  PAD_BROADCAST_EVENT (aggpad);
+  PAD_UNLOCK (aggpad);
+}
+
+static void
 gst_aggregator_aggregate_func (GstAggregator * self)
 {
   GstAggregatorPrivate *priv = self->priv;
@@ -655,10 +667,12 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     flow_return = klass->aggregate (self, timeout);
 
     GST_OBJECT_LOCK (self);
-    if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking)
-      priv->flow_return = GST_FLOW_OK;
-    else
-      priv->flow_return = flow_return;
+    if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
+      /* We don't want to set the pads to flushing, but we want to
+       * stop the thread, so just break here */
+      GST_OBJECT_UNLOCK (self);
+      break;
+    }
     GST_OBJECT_UNLOCK (self);
 
     if (flow_return == GST_FLOW_EOS) {
@@ -667,8 +681,18 @@ gst_aggregator_aggregate_func (GstAggregator * self)
 
     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
 
-    if (flow_return != GST_FLOW_OK)
+    if (flow_return != GST_FLOW_OK) {
+      GList *item;
+
+      GST_OBJECT_LOCK (self);
+      for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
+        GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
+
+        gst_aggregator_pad_set_flushing (aggpad, flow_return);
+      }
+      GST_OBJECT_UNLOCK (self);
       break;
+    }
   }
 
   /* Pause the task here, the only ways to get here are:
@@ -692,7 +716,6 @@ gst_aggregator_start (GstAggregator * self)
   self->priv->send_segment = TRUE;
   self->priv->send_eos = TRUE;
   self->priv->srccaps = NULL;
-  self->priv->flow_return = GST_FLOW_OK;
 
   klass = GST_AGGREGATOR_GET_CLASS (self);
 
@@ -790,24 +813,13 @@ gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
 }
 
 static void
-gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad)
-{
-  PAD_LOCK (aggpad);
-  aggpad->priv->flushing = TRUE;
-  gst_buffer_replace (&aggpad->priv->buffer, NULL);
-  PAD_BROADCAST_EVENT (aggpad);
-  PAD_UNLOCK (aggpad);
-}
-
-
-static void
 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
     GstEvent * event)
 {
   GstAggregatorPrivate *priv = self->priv;
   GstAggregatorPadPrivate *padpriv = aggpad->priv;
 
-  gst_aggregator_pad_set_flushing (aggpad);
+  gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
 
   PAD_FLUSH_LOCK (aggpad);
   PAD_LOCK (aggpad);
@@ -828,7 +840,6 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
 
       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
       gst_aggregator_stop_srcpad_task (self, event);
-      priv->flow_return = GST_FLOW_OK;
 
       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
       GST_PAD_STREAM_LOCK (self->srcpad);
@@ -1065,7 +1076,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad)
   GST_INFO_OBJECT (pad, "Removing pad");
 
   SRC_LOCK (self);
-  gst_aggregator_pad_set_flushing (aggpad);
+  gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
   gst_element_remove_pad (element, pad);
 
   SRC_BROADCAST (self);
@@ -1790,7 +1801,6 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
 {
   GstBuffer *actual_buf = buffer;
   GstAggregator *self = GST_AGGREGATOR (object);
-  GstAggregatorPrivate *priv = self->priv;
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
   GstFlowReturn flow_return;
@@ -1800,13 +1810,18 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   PAD_FLUSH_LOCK (aggpad);
 
   PAD_LOCK (aggpad);
+  flow_return = aggpad->priv->flow_return;
+  if (flow_return != GST_FLOW_OK)
+    goto flushing;
+
   if (aggpad->priv->pending_eos == TRUE)
     goto eos;
 
-  while (aggpad->priv->buffer && !aggpad->priv->flushing)
+  while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
     PAD_WAIT_EVENT (aggpad);
 
-  if (aggpad->priv->flushing)
+  flow_return = aggpad->priv->flow_return;
+  if (flow_return != GST_FLOW_OK)
     goto flushing;
 
   PAD_UNLOCK (aggpad);
@@ -1820,6 +1835,9 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   if (aggpad->priv->buffer)
     gst_buffer_unref (aggpad->priv->buffer);
   aggpad->priv->buffer = actual_buf;
+
+  flow_return = aggpad->priv->flow_return;
+
   PAD_UNLOCK (aggpad);
   PAD_FLUSH_UNLOCK (aggpad);
 
@@ -1828,10 +1846,6 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
 
   GST_DEBUG_OBJECT (aggpad, "Done chaining");
 
-  GST_OBJECT_LOCK (self);
-  flow_return = priv->flow_return;
-  GST_OBJECT_UNLOCK (self);
-
   return flow_return;
 
 flushing:
@@ -1839,9 +1853,10 @@ flushing:
   PAD_FLUSH_UNLOCK (aggpad);
 
   gst_buffer_unref (buffer);
-  GST_DEBUG_OBJECT (aggpad, "We are flushing");
+  GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
+      gst_flow_get_name (flow_return));
 
-  return GST_FLOW_FLUSHING;
+  return flow_return;
 
 eos:
   PAD_UNLOCK (aggpad);
@@ -1863,10 +1878,10 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
   if (GST_QUERY_IS_SERIALIZED (query)) {
     PAD_LOCK (aggpad);
 
-    while (aggpad->priv->buffer && !aggpad->priv->flushing)
+    while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
       PAD_WAIT_EVENT (aggpad);
 
-    if (aggpad->priv->flushing)
+    if (aggpad->priv->flow_return != GST_FLOW_OK)
       goto flushing;
 
     PAD_UNLOCK (aggpad);
@@ -1876,8 +1891,9 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
       GST_AGGREGATOR_PAD (pad), query);
 
 flushing:
+  GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
+      gst_flow_get_name (aggpad->priv->flow_return));
   PAD_UNLOCK (aggpad);
-  GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
   return FALSE;
 }
 
@@ -1893,10 +1909,10 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
     PAD_LOCK (aggpad);
 
 
-    while (aggpad->priv->buffer && !aggpad->priv->flushing)
+    while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
       PAD_WAIT_EVENT (aggpad);
 
-    if (aggpad->priv->flushing
+    if (aggpad->priv->flow_return != GST_FLOW_OK
         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
       goto flushing;
 
@@ -1907,8 +1923,9 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
       GST_AGGREGATOR_PAD (pad), event);
 
 flushing:
+  GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
+      gst_flow_get_name (aggpad->priv->flow_return));
   PAD_UNLOCK (aggpad);
-  GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
   if (GST_EVENT_IS_STICKY (event))
     gst_pad_store_sticky_event (pad, event);
   gst_event_unref (event);
@@ -1922,10 +1939,10 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
 
   if (active == FALSE) {
-    gst_aggregator_pad_set_flushing (aggpad);
+    gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
   } else {
     PAD_LOCK (aggpad);
-    aggpad->priv->flushing = FALSE;
+    aggpad->priv->flow_return = GST_FLOW_OK;
     PAD_BROADCAST_EVENT (aggpad);
     PAD_UNLOCK (aggpad);
   }