aggregator: refactor flushing logic
authorMathieu Duponchelle <mathieu@centricular.com>
Wed, 22 May 2019 19:37:43 +0000 (21:37 +0200)
committerMathieu Duponchelle <mathieu@centricular.com>
Mon, 10 Jun 2019 21:04:31 +0000 (23:04 +0200)
Instead of tracking "pending_flush_*" on the pads and the
aggregator, we now simply track the last seqnum for flush start
and flush stop events on the pads, and use it to determine whether
we should enter or exit our flushing state.

See https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/977

libs/gst/base/gstaggregator.c
tests/check/libs/aggregator.c

index 9dba771..a5b2860 100644 (file)
@@ -234,8 +234,9 @@ struct _GstAggregatorPadPrivate
 {
   /* Following fields are protected by the PAD_LOCK */
   GstFlowReturn flow_return;
-  gboolean pending_flush_start;
-  gboolean pending_flush_stop;
+
+  guint32 last_flush_start_seqnum;
+  guint32 last_flush_stop_seqnum;
 
   gboolean first_buffer;
 
@@ -315,13 +316,14 @@ struct _GstAggregatorPrivate
   /* Our state is >= PAUSED */
   gboolean running;             /* protected by src_lock */
 
-  /* seqnum from seek or segment,
-   * to be applied to synthetic segment/eos events */
-  gint seqnum;
+  /* seqnum from last seek or common seqnum to flush start events received
+   * on all pads, for flushing without a seek */
+  guint32 next_seqnum;
+  /* seqnum to apply to synthetic segment/eos events */
+  guint32 seqnum;
   gboolean send_stream_start;   /* protected by srcpad stream lock */
   gboolean send_segment;
-  gboolean flush_seeking;
-  gboolean pending_flush_start;
+  gboolean flushing;
   gboolean send_eos;            /* protected by srcpad stream lock */
 
   GstCaps *srccaps;             /* protected by the srcpad stream lock */
@@ -513,7 +515,7 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
   }
 
   GST_OBJECT_LOCK (self);
-  if (self->priv->send_segment && !self->priv->flush_seeking) {
+  if (self->priv->send_segment && !self->priv->flushing) {
     segment =
         gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
 
@@ -528,7 +530,7 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
   }
 
-  if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
+  if (priv->tags && priv->tags_changed && !self->priv->flushing) {
     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
     priv->tags_changed = FALSE;
   }
@@ -563,13 +565,13 @@ gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
   gst_aggregator_push_mandatory_events (self);
 
   GST_OBJECT_LOCK (self);
-  if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
+  if (!self->priv->flushing && gst_pad_is_active (self->srcpad)) {
     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
     GST_OBJECT_UNLOCK (self);
     return gst_pad_push (self->srcpad, buffer);
   } else {
     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
-        self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
+        self->priv->flushing, gst_pad_is_active (self->srcpad));
     GST_OBJECT_UNLOCK (self);
     gst_buffer_unref (buffer);
     return GST_FLOW_OK;
@@ -1156,7 +1158,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
       continue;
 
     GST_OBJECT_LOCK (self);
-    if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
+    if (flow_return == GST_FLOW_FLUSHING && priv->flushing) {
       /* We don't want to set the pads to flushing, but we want to
        * stop the thread, so just break here */
       GST_OBJECT_UNLOCK (self);
@@ -1219,18 +1221,6 @@ gst_aggregator_start (GstAggregator * self)
 }
 
 static gboolean
-_check_pending_flush_stop (GstAggregatorPad * pad)
-{
-  gboolean res;
-
-  PAD_LOCK (pad);
-  res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
-  PAD_UNLOCK (pad);
-
-  return res;
-}
-
-static gboolean
 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
 {
   gboolean res = TRUE;
@@ -1272,7 +1262,7 @@ gst_aggregator_flush (GstAggregator * self)
   GST_DEBUG_OBJECT (self, "Flushing everything");
   GST_OBJECT_LOCK (self);
   priv->send_segment = TRUE;
-  priv->flush_seeking = FALSE;
+  priv->flushing = FALSE;
   priv->tags_changed = FALSE;
   GST_OBJECT_UNLOCK (self);
   if (klass->flush)
@@ -1285,7 +1275,25 @@ gst_aggregator_flush (GstAggregator * self)
 /* Called with GstAggregator's object lock held */
 
 static gboolean
-gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
+gst_aggregator_all_flush_stop_received (GstAggregator * self, guint32 seqnum)
+{
+  GList *tmp;
+  GstAggregatorPad *tmppad;
+
+  for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
+    tmppad = (GstAggregatorPad *) tmp->data;
+
+    if (tmppad->priv->last_flush_stop_seqnum != seqnum)
+      return FALSE;
+  }
+
+  return TRUE;
+}
+
+/* Called with GstAggregator's object lock held */
+
+static gboolean
+gst_aggregator_all_flush_start_received (GstAggregator * self, guint32 seqnum)
 {
   GList *tmp;
   GstAggregatorPad *tmppad;
@@ -1293,9 +1301,7 @@ gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
     tmppad = (GstAggregatorPad *) tmp->data;
 
-    if (_check_pending_flush_stop (tmppad) == FALSE) {
-      GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
-          tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
+    if (tmppad->priv->last_flush_start_seqnum != seqnum) {
       return FALSE;
     }
   }
@@ -1309,41 +1315,36 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
 {
   GstAggregatorPrivate *priv = self->priv;
   GstAggregatorPadPrivate *padpriv = aggpad->priv;
+  guint32 seqnum = gst_event_get_seqnum (event);
 
   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
 
   PAD_FLUSH_LOCK (aggpad);
   PAD_LOCK (aggpad);
-  if (padpriv->pending_flush_start) {
-    GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
-
-    padpriv->pending_flush_start = FALSE;
-    padpriv->pending_flush_stop = TRUE;
-  }
+  padpriv->last_flush_start_seqnum = seqnum;
   PAD_UNLOCK (aggpad);
 
   GST_OBJECT_LOCK (self);
-  if (priv->flush_seeking) {
-    /* If flush_seeking we forward the first FLUSH_START */
-    if (priv->pending_flush_start) {
-      priv->pending_flush_start = FALSE;
-      GST_OBJECT_UNLOCK (self);
 
-      GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
-      gst_aggregator_stop_srcpad_task (self, event);
+  if (!priv->flushing && gst_aggregator_all_flush_start_received (self, seqnum)) {
+    /* Make sure we don't forward more than one FLUSH_START */
+    priv->flushing = TRUE;
+    priv->next_seqnum = seqnum;
+    GST_OBJECT_UNLOCK (self);
 
-      GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
-      GST_PAD_STREAM_LOCK (self->srcpad);
-      GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
-      event = NULL;
-    } else {
-      GST_OBJECT_UNLOCK (self);
-      gst_event_unref (event);
-    }
+    GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
+    gst_aggregator_stop_srcpad_task (self, event);
+
+    GST_INFO_OBJECT (self, "Getting STREAM_LOCK while flushing");
+    GST_PAD_STREAM_LOCK (self->srcpad);
+    GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
+
+    event = NULL;
   } else {
-    GST_OBJECT_UNLOCK (self);
     gst_event_unref (event);
+    GST_OBJECT_UNLOCK (self);
   }
+
   PAD_FLUSH_UNLOCK (aggpad);
 }
 
@@ -1400,38 +1401,52 @@ gst_aggregator_default_sink_event (GstAggregator * self,
     case GST_EVENT_FLUSH_START:
     {
       gst_aggregator_flush_start (self, aggpad, event);
-      /* We forward only in one case: right after flush_seeking */
+      /* We forward only in one case: right after flushing */
       event = NULL;
       goto eat;
     }
     case GST_EVENT_FLUSH_STOP:
     {
+      guint32 seqnum = gst_event_get_seqnum (event);
+
+      PAD_FLUSH_LOCK (aggpad);
+      PAD_LOCK (aggpad);
+      aggpad->priv->last_flush_stop_seqnum = seqnum;
+      PAD_UNLOCK (aggpad);
+
+      /* aggregate might be running if this FLUSH_STOP was not
+       * sent following a flushing seek, let's make sure we don't
+       * flush the pad's current buffer before aggregate has returned
+       */
+      GST_PAD_STREAM_LOCK (self->srcpad);
       gst_aggregator_pad_flush (aggpad, self);
+      GST_PAD_STREAM_UNLOCK (self->srcpad);
+
       GST_OBJECT_LOCK (self);
-      if (priv->flush_seeking) {
-        g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
-        if (gst_aggregator_all_flush_stop_received_locked (self)) {
-          GST_OBJECT_UNLOCK (self);
-          /* That means we received FLUSH_STOP/FLUSH_STOP on
-           * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
-          gst_aggregator_flush (self);
-          gst_pad_push_event (self->srcpad, event);
-          event = NULL;
-          SRC_LOCK (self);
-          priv->send_eos = TRUE;
-          SRC_BROADCAST (self);
-          SRC_UNLOCK (self);
-
-          GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
-          GST_PAD_STREAM_UNLOCK (self->srcpad);
-          gst_aggregator_start_srcpad_task (self);
-        } else {
-          GST_OBJECT_UNLOCK (self);
-        }
+      if (priv->flushing
+          && gst_aggregator_all_flush_stop_received (self, seqnum)) {
+        GST_OBJECT_UNLOCK (self);
+        /* That means we received FLUSH_STOP/FLUSH_STOP on
+         * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
+        gst_aggregator_flush (self);
+        gst_pad_push_event (self->srcpad, event);
+        event = NULL;
+        SRC_LOCK (self);
+        priv->send_eos = TRUE;
+        SRC_BROADCAST (self);
+        SRC_UNLOCK (self);
+
+        GST_INFO_OBJECT (self,
+            "Flush stopped, releasing source pad STREAM_LOCK");
+        GST_PAD_STREAM_UNLOCK (self->srcpad);
+
+        gst_aggregator_start_srcpad_task (self);
       } else {
         GST_OBJECT_UNLOCK (self);
       }
 
+      PAD_FLUSH_UNLOCK (aggpad);
+
       /* We never forward the event */
       goto eat;
     }
@@ -1581,6 +1596,14 @@ gst_aggregator_stop (GstAggregator * agg)
 
   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
 
+  if (agg->priv->running) {
+    /* As sinkpads get deactivated after the src pad, we
+     * may have restarted the source pad task after receiving
+     * flush events on one of our sinkpads. Stop our src pad
+     * task again if that is the case */
+    gst_aggregator_stop_srcpad_task (agg, NULL);
+  }
+
   return result;
 }
 
@@ -1875,7 +1898,7 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event)
     GST_OBJECT_LOCK (self);
     gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
         flags, start_type, start, stop_type, stop, NULL);
-    self->priv->seqnum = gst_event_get_seqnum (event);
+    self->priv->next_seqnum = gst_event_get_seqnum (event);
     self->priv->first_buffer = FALSE;
     GST_OBJECT_UNLOCK (self);
 
@@ -1883,7 +1906,6 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event)
   }
   GST_STATE_UNLOCK (element);
 
-
   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
       event);
 }
@@ -1959,13 +1981,6 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
 
       gst_query_unref (seeking);
     }
-
-    if (evdata->flush) {
-      PAD_LOCK (aggpad);
-      aggpad->priv->pending_flush_start = FALSE;
-      aggpad->priv->pending_flush_stop = FALSE;
-      PAD_UNLOCK (aggpad);
-    }
   } else {
     evdata->one_actually_seeked = TRUE;
   }
@@ -1986,24 +2001,6 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
   evdata->result = TRUE;
   evdata->one_actually_seeked = FALSE;
 
-  /* We first need to set all pads as flushing in a first pass
-   * as flush_start flush_stop is sometimes sent synchronously
-   * while we send the seek event */
-  if (evdata->flush) {
-    GList *l;
-
-    GST_OBJECT_LOCK (self);
-    for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
-      GstAggregatorPad *pad = l->data;
-
-      PAD_LOCK (pad);
-      pad->priv->pending_flush_start = TRUE;
-      pad->priv->pending_flush_stop = FALSE;
-      PAD_UNLOCK (pad);
-    }
-    GST_OBJECT_UNLOCK (self);
-  }
-
   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
 
   gst_event_unref (evdata->event);
@@ -2029,18 +2026,26 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
   flush = flags & GST_SEEK_FLAG_FLUSH;
 
   GST_OBJECT_LOCK (self);
-  if (flush) {
-    priv->pending_flush_start = TRUE;
-    priv->flush_seeking = TRUE;
-  }
+  self->priv->next_seqnum = gst_event_get_seqnum (event);
 
   gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
       flags, start_type, start, stop_type, stop, NULL);
 
   /* Seeking sets a position */
   self->priv->first_buffer = FALSE;
+
+  if (flush)
+    priv->flushing = TRUE;
+
   GST_OBJECT_UNLOCK (self);
 
+  if (flush) {
+    GstEvent *event = gst_event_new_flush_start ();
+
+    gst_event_set_seqnum (event, self->priv->next_seqnum);
+    gst_aggregator_stop_srcpad_task (self, event);
+  }
+
   /* forward the seek upstream */
   evdata.event = event;
   evdata.flush = flush;
@@ -2050,8 +2055,7 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
 
   if (!evdata.result || !evdata.one_actually_seeked) {
     GST_OBJECT_LOCK (self);
-    priv->flush_seeking = FALSE;
-    priv->pending_flush_start = FALSE;
+    priv->flushing = FALSE;
     GST_OBJECT_UNLOCK (self);
   }
 
@@ -2136,6 +2140,7 @@ gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
 
   /* deactivating */
   GST_INFO_OBJECT (self, "Deactivating srcpad");
+
   gst_aggregator_stop_srcpad_task (self, FALSE);
 
   return TRUE;
index 631f982..3424024 100644 (file)
@@ -797,6 +797,7 @@ GST_START_TEST (test_flushing_seek)
   ChainData data2 = { 0, };
   TestData test = { 0, };
   GstBuffer *buf;
+  guint32 seqnum;
 
   _test_data_init (&test, TRUE);
 
@@ -817,16 +818,20 @@ GST_START_TEST (test_flushing_seek)
   /* now do a successful flushing seek */
   event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
       GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
+  seqnum = gst_event_get_seqnum (event);
   fail_unless (gst_pad_send_event (test.srcpad, event));
 
-  /* flushing starts once one of the upstream elements sends the first
-   * FLUSH_START */
-  fail_unless_equals_int (test.flush_start_events, 0);
+  /* flushing starts when a flushing seek is received, and stops
+   * when all sink pads have received FLUSH_STOP */
+  fail_unless_equals_int (test.flush_start_events, 1);
   fail_unless_equals_int (test.flush_stop_events, 0);
 
-  /* send a first FLUSH_START on agg:sink_0, will be sent downstream */
+  /* send a first FLUSH_START on agg:sink_0, nothing will be sent
+   * downstream */
   GST_DEBUG_OBJECT (data2.sinkpad, "send flush_start");
-  fail_unless (gst_pad_push_event (data2.srcpad, gst_event_new_flush_start ()));
+  event = gst_event_new_flush_start ();
+  gst_event_set_seqnum (event, seqnum);
+  fail_unless (gst_pad_push_event (data2.srcpad, event));
   fail_unless_equals_int (test.flush_start_events, 1);
   fail_unless_equals_int (test.flush_stop_events, 0);
 
@@ -834,16 +839,19 @@ GST_START_TEST (test_flushing_seek)
   data2.expected_result = GST_FLOW_FLUSHING;
   thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
 
-  /* this should send not additional flush_start */
+  /* this should send no additional flush_start */
   GST_DEBUG_OBJECT (data1.sinkpad, "send flush_start");
-  fail_unless (gst_pad_push_event (data1.srcpad, gst_event_new_flush_start ()));
+  event = gst_event_new_flush_start ();
+  gst_event_set_seqnum (event, seqnum);
+  fail_unless (gst_pad_push_event (data1.srcpad, event));
   fail_unless_equals_int (test.flush_start_events, 1);
   fail_unless_equals_int (test.flush_stop_events, 0);
 
   /* the first FLUSH_STOP is not forwarded downstream */
   GST_DEBUG_OBJECT (data1.srcpad, "send flush_stop");
-  fail_unless (gst_pad_push_event (data1.srcpad,
-          gst_event_new_flush_stop (TRUE)));
+  event = gst_event_new_flush_stop (TRUE);
+  gst_event_set_seqnum (event, seqnum);
+  fail_unless (gst_pad_push_event (data1.srcpad, event));
   fail_unless_equals_int (test.flush_start_events, 1);
   fail_unless_equals_int (test.flush_stop_events, 0);
 
@@ -858,7 +866,9 @@ GST_START_TEST (test_flushing_seek)
   /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
    * sent downstream */
   GST_DEBUG_OBJECT (data2.srcpad, "send flush_stop");
-  gst_pad_push_event (data2.srcpad, gst_event_new_flush_stop (TRUE));
+  event = gst_event_new_flush_stop (TRUE);
+  gst_event_set_seqnum (event, seqnum);
+  gst_pad_push_event (data2.srcpad, event);
 
   /* and the last FLUSH_STOP is forwarded downstream */
   fail_unless_equals_int (test.flush_stop_events, 1);