validate-scenario: Refactor seek handling
authorEdward Hervey <edward@centricular.com>
Tue, 5 Jun 2018 15:56:36 +0000 (17:56 +0200)
committerThibault Saunier <tsaunier@igalia.com>
Thu, 30 Apr 2020 16:40:12 +0000 (12:40 -0400)
* Store all seek values into a list of pending seeks instead
  of hardcoding some values
* Store all segments that sinks received
* Match segments to seeks when all sinks received segments with
  the same seqnum
* Detect when a seek did *not* result in segments with identical
  matching seqnums

Should allow checking for all types of seek handling, including
flush-less seeks

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-devtools/-/merge_requests/174>

validate/gst/validate/gst-validate-scenario.c

index 2a72c2d..6d62450 100644 (file)
@@ -123,6 +123,34 @@ _fill_action (GstValidateScenario * scenario, GstValidateAction * action,
     GstStructure * structure, gboolean add_to_lists);
 static gboolean _action_set_done (GstValidateAction * action);
 
+/* GstValidateSinkInformation tracks information for all sinks in the pipeline */
+typedef struct
+{
+  GstElement *sink;             /* The sink element tracked */
+  guint32 segment_seqnum;       /* The latest segment seqnum. GST_SEQNUM_INVALID if none */
+  GstSegment segment;           /* The latest segment */
+} GstValidateSinkInformation;
+
+/* GstValidateSeekInformation tracks:
+ * * The values used in the seek
+ * * The seqnum used in the seek event
+ * * The validate action to which it relates
+ */
+typedef struct
+{
+  guint32 seqnum;               /* seqnum of the seek event */
+
+  /* Seek values */
+  gdouble rate;
+  GstFormat format;
+  GstSeekFlags flags;
+  GstSeekType start_type, stop_type;
+  gint64 start, stop;
+
+  /* The action corresponding to this seek */
+  GstValidateAction *action;
+} GstValidateSeekInformation;
+
 /* GstValidateScenario is not really thread safe and
  * everything should be done from the thread GstValidate
  * was inited from, unless stated otherwise.
@@ -141,14 +169,37 @@ struct _GstValidateScenarioPrivate
 
   gboolean needs_playback_parsing;
 
+  GList *sinks;                 /* List of GstValidateSinkInformation */
+  GList *seeks;                 /* List of GstValidateSeekInformation */
+
+  /* Seek currently applied (set when all sinks received segment with
+   * an identical seqnum and there is a matching pending seek).
+   * do not free, should always be present in the seek list above */
+  GstValidateSeekInformation *current_seek;
+  /* Current unified seqnum. Set when all sinks received segment with
+   * an identical seqnum, even if there wasn't a matching pending seek
+   */
+  guint32 current_seqnum;
+
   /*  List of action that need parsing when reaching ASYNC_DONE
    *  most probably to be able to query duration */
 
-  GstEvent *last_seek;
+  /* seek_flags :
+   *  * Only set for seek actions, and only if seek succeeded
+   *  * Only Used in _check_position()
+   * FIXME : Just use the seek information */
   GstSeekFlags seek_flags;
   GstFormat seek_format;
+
+  /* segment_start/segment_stop :
+   *  * Set : from seek values
+   *  * Read : In _check_position()
+   * FIXME : Just use the current seek information */
   GstClockTime segment_start;
   GstClockTime segment_stop;
+
+  /* Always initialized to a default value
+   * FIXME : Is it still needed with the new seeking validation system ? */
   GstClockTime seek_pos_tol;
 
   /* If we seeked in paused the position should be exactly what
@@ -191,6 +242,7 @@ struct _GstValidateScenarioPrivate
   GWeakRef ref_pipeline;
 
   GstTestClock *clock;
+  guint segments_needed;
 };
 
 typedef struct KeyFileGroupName
@@ -262,6 +314,13 @@ g_error_action (gpointer action, const gchar * format, ...)
   g_free (tmp);
 }
 
+static void
+gst_validate_seek_information_free (GstValidateSeekInformation * info)
+{
+  gst_validate_action_unref (info->action);
+  g_free (info);
+}
+
 static GstValidateInterceptionReturn
 gst_validate_scenario_intercept_report (GstValidateReporter * reporter,
     GstValidateReport * report)
@@ -633,6 +692,13 @@ _check_scenario_is_done (GstValidateScenario * scenario)
   }
 }
 
+static void
+_reset_sink_information (GstValidateSinkInformation * sinkinfo)
+{
+  sinkinfo->segment_seqnum = GST_SEQNUM_INVALID;
+  gst_segment_init (&sinkinfo->segment, GST_FORMAT_UNDEFINED);
+}
+
 /**
  * gst_validate_action_get_clocktime:
  * @scenario: The #GstValidateScenario from which to get a time
@@ -701,6 +767,131 @@ gst_validate_action_get_clocktime (GstValidateScenario * scenario,
   return TRUE;
 }
 
+/* WITH SCENARIO LOCK TAKEN */
+static GstValidateSinkInformation *
+_find_sink_information (GstValidateScenario * scenario, GstElement * sink)
+{
+  GList *tmp;
+
+  for (tmp = scenario->priv->sinks; tmp; tmp = tmp->next) {
+    GstValidateSinkInformation *sink_info =
+        (GstValidateSinkInformation *) tmp->data;
+    if (sink_info->sink == sink)
+      return sink_info;
+  }
+  return NULL;
+}
+
+/* WITH SCENARIO LOCK TAKEN */
+static GstValidateSeekInformation *
+_find_seek_information (GstValidateScenario * scenario, guint32 seqnum)
+{
+  GList *tmp;
+
+  for (tmp = scenario->priv->seeks; tmp; tmp = tmp->next) {
+    GstValidateSeekInformation *seek_info =
+        (GstValidateSeekInformation *) tmp->data;
+    if (seek_info->seqnum == seqnum)
+      return seek_info;
+  }
+
+  return NULL;
+}
+
+/* WITH SCENARIO LOCK TAKEN */
+static void
+_validate_sink_information (GstValidateScenario * scenario)
+{
+  GList *tmp;
+  gboolean all_sinks_ready = TRUE;
+  gboolean identical_seqnum = TRUE;
+  gboolean transitioning = FALSE;
+  guint32 common_seqnum = GST_SEQNUM_INVALID;
+  guint32 next_seqnum = GST_SEQNUM_INVALID;
+  GstValidateSeekInformation *seek_info;
+
+  if (scenario->priv->seeks)
+    /* If we have a pending seek, get the expected seqnum to
+     * figure out whether we are transitioning to a seek */
+    next_seqnum =
+        ((GstValidateSeekInformation *) scenario->priv->seeks->data)->seqnum;
+
+  GST_LOG_OBJECT (scenario, "next_seqnum %" G_GUINT32_FORMAT, next_seqnum);
+
+  for (tmp = scenario->priv->sinks; tmp; tmp = tmp->next) {
+    GstValidateSinkInformation *sink_info =
+        (GstValidateSinkInformation *) tmp->data;
+    GST_DEBUG_OBJECT (sink_info->sink,
+        "seqnum:%" G_GUINT32_FORMAT " segment:%" GST_SEGMENT_FORMAT,
+        sink_info->segment_seqnum, &sink_info->segment);
+    if (sink_info->segment_seqnum == GST_SEQNUM_INVALID)
+      all_sinks_ready = FALSE;
+    else if (sink_info->segment.format == GST_FORMAT_TIME) {
+      /* Are we in the middle of switching segments (from the current
+       * one, or to the next week) ? */
+      if (sink_info->segment_seqnum == scenario->priv->current_seqnum ||
+          sink_info->segment_seqnum == next_seqnum)
+        transitioning = TRUE;
+
+      /* We are only interested in sinks that handle TIME segments */
+      if (common_seqnum == GST_SEQNUM_INVALID)
+        common_seqnum = sink_info->segment_seqnum;
+      else if (common_seqnum != sink_info->segment_seqnum) {
+        identical_seqnum = FALSE;
+      }
+    }
+  }
+
+  /* If not all sinks have received a segment, just return */
+  if (!all_sinks_ready)
+    return;
+
+  GST_FIXME_OBJECT (scenario,
+      "All sinks have valid segment. identical_seqnum:%d transitioning:%d seqnum:%"
+      G_GUINT32_FORMAT " (current:%" G_GUINT32_FORMAT ") seeks:%p",
+      identical_seqnum, transitioning, common_seqnum,
+      scenario->priv->current_seqnum, scenario->priv->seeks);
+
+  if (!identical_seqnum) {
+    /* If all sinks received a segment *and* there is a pending seek *and* there
+     * wasn't one previously, we definitely have a failure */
+    if (!transitioning && scenario->priv->current_seek == NULL
+        && scenario->priv->seeks) {
+      GST_VALIDATE_REPORT (scenario, EVENT_SEEK_INVALID_SEQNUM,
+          "Not all segments from a given seek have the same seqnum");
+      return;
+    }
+    /* Otherwise we're either doing the initial preroll (without seek)
+     * or we are in the middle of switching to another seek */
+    return;
+  }
+
+  /* Now check if we have seek data related to that seqnum */
+  seek_info = _find_seek_information (scenario, common_seqnum);
+
+  if (seek_info && seek_info != scenario->priv->current_seek) {
+    GST_DEBUG_OBJECT (scenario, "Found a corresponding seek !");
+    /* Updating values */
+    /* FIXME : Check segment values if needed ! */
+    /* FIXME : Non-flushing seek, validate here */
+    if (seek_info->start_type == GST_SEEK_TYPE_SET)
+      scenario->priv->segment_start = seek_info->start;
+    if (seek_info->stop_type == GST_SEEK_TYPE_SET)
+      scenario->priv->segment_stop = seek_info->stop;
+    if (scenario->priv->target_state == GST_STATE_PAUSED)
+      scenario->priv->seeked_in_pause = TRUE;
+    SCENARIO_UNLOCK (scenario);
+    /* If it's a non-flushing seek, validate it here
+     * otherwise we will do it when the async_done is received */
+    if (!(seek_info->flags & GST_SEEK_FLAG_FLUSH))
+      gst_validate_action_set_done (seek_info->action);
+    SCENARIO_LOCK (scenario);
+  }
+  /* We always set the current_seek. Can be NULL if no matching  */
+  scenario->priv->current_seek = seek_info;
+  scenario->priv->current_seqnum = common_seqnum;
+}
+
 /**
  * gst_validate_scenario_execute_seek:
  * @scenario: The #GstValidateScenario for which to execute a seek action
@@ -728,6 +919,7 @@ gst_validate_scenario_execute_seek (GstValidateScenario * scenario,
     GstSeekType stop_type, GstClockTime stop)
 {
   GstEvent *seek;
+  GstValidateSeekInformation *seek_info;
 
   GstValidateExecuteActionReturn ret = GST_VALIDATE_EXECUTE_ACTION_ASYNC;
   GstValidateScenarioPrivate *priv = scenario->priv;
@@ -740,12 +932,25 @@ gst_validate_scenario_execute_seek (GstValidateScenario * scenario,
     GST_VALIDATE_REPORT_ACTION (scenario, action,
         SCENARIO_ACTION_EXECUTION_ERROR,
         "Trying to seek in format %d, but not support yet!", format);
-
   }
 
+  seek_info = g_new0 (GstValidateSeekInformation, 1);
+  seek_info->seqnum = GST_EVENT_SEQNUM (seek);
+  seek_info->rate = rate;
+  seek_info->format = format;
+  seek_info->flags = flags;
+  seek_info->start = start;
+  seek_info->stop = stop;
+  seek_info->start_type = start_type;
+  seek_info->stop_type = stop_type;
+  seek_info->action = gst_validate_action_ref (action);
+
+  SCENARIO_LOCK (scenario);
+  priv->seeks = g_list_append (priv->seeks, seek_info);
+  SCENARIO_UNLOCK (scenario);
+
   gst_event_ref (seek);
   if (gst_element_send_event (pipeline, seek)) {
-    gst_event_replace (&priv->last_seek, seek);
     priv->seek_flags = flags;
     priv->seek_format = format;
   } else {
@@ -773,6 +978,11 @@ gst_validate_scenario_execute_seek (GstValidateScenario * scenario,
         break;
       }
     }
+    SCENARIO_LOCK (scenario);
+    priv->seeks = g_list_remove (priv->seeks, seek_info);
+    SCENARIO_UNLOCK (scenario);
+
+    gst_validate_seek_information_free (seek_info);
     ret = GST_VALIDATE_EXECUTE_ACTION_ERROR_REPORTED;
   }
   gst_event_unref (seek);
@@ -3066,30 +3276,6 @@ _execute_disable_plugin (GstValidateScenario * scenario,
   return GST_VALIDATE_EXECUTE_ACTION_OK;
 }
 
-static void
-gst_validate_scenario_update_segment_from_seek (GstValidateScenario * scenario,
-    GstEvent * seek)
-{
-  GstValidateScenarioPrivate *priv = scenario->priv;
-  gint64 start, stop;
-  GstSeekType start_type, stop_type;
-
-  gst_event_parse_seek (seek, NULL, NULL, NULL, &start_type, &start,
-      &stop_type, &stop);
-
-  if (start_type == GST_SEEK_TYPE_SET) {
-    priv->segment_start = start;
-  } else if (start_type == GST_SEEK_TYPE_END) {
-    /* TODO fill me */
-  }
-
-  if (stop_type == GST_SEEK_TYPE_SET) {
-    priv->segment_stop = stop;
-  } else if (stop_type == GST_SEEK_TYPE_END) {
-    /* TODO fill me */
-  }
-}
-
 static GstValidateExecuteActionReturn
 gst_validate_action_default_prepare_func (GstValidateAction * action)
 {
@@ -3234,23 +3420,20 @@ message_cb (GstBus * bus, GstMessage * message, GstValidateScenario * scenario)
     return FALSE;
   }
 
+  GST_DEBUG_OBJECT (scenario, "message %" GST_PTR_FORMAT, message);
+
   switch (GST_MESSAGE_TYPE (message)) {
     case GST_MESSAGE_ASYNC_DONE:
-      if (priv->last_seek) {
-        gst_validate_scenario_update_segment_from_seek (scenario,
-            priv->last_seek);
-
-        if (priv->target_state == GST_STATE_PAUSED)
-          priv->seeked_in_pause = TRUE;
-
-        gst_event_replace (&priv->last_seek, NULL);
-        gst_validate_action_set_done (priv->actions->data);
+      if (priv->current_seek
+          && ((priv->current_seek->flags & GST_SEEK_FLAG_FLUSH) &&
+              (priv->current_seek->action->priv->state ==
+                  GST_VALIDATE_EXECUTE_ACTION_ASYNC))) {
+        gst_validate_action_set_done (priv->current_seek->action);
       } else if (scenario->priv->needs_async_done) {
         scenario->priv->needs_async_done = FALSE;
         if (priv->actions && _action_sets_state (priv->actions->data)
             && !priv->changing_state)
           gst_validate_action_set_done (priv->actions->data);
-
       }
 
       if (scenario->priv->needs_playback_parsing) {
@@ -3267,6 +3450,17 @@ message_cb (GstBus * bus, GstMessage * message, GstValidateScenario * scenario)
 
         gst_message_parse_state_changed (message, &pstate, &nstate, NULL);
 
+        if (pstate == GST_STATE_PAUSED && nstate == GST_STATE_READY) {
+          /* Reset sink information */
+          SCENARIO_LOCK (scenario);
+          g_list_foreach (scenario->priv->sinks,
+              (GFunc) _reset_sink_information, NULL);
+          /* Reset current seek */
+          scenario->priv->current_seek = NULL;
+          scenario->priv->current_seqnum = GST_SEQNUM_INVALID;
+          SCENARIO_UNLOCK (scenario);
+        }
+
         if (scenario->priv->changing_state &&
             scenario->priv->target_state == nstate) {
           scenario->priv->changing_state = FALSE;
@@ -3394,7 +3588,9 @@ message_cb (GstBus * bus, GstMessage * message, GstValidateScenario * scenario)
       }
       /* Make sure that if there is an ASYNC_DONE in the message queue, we do not
          take it into account */
-      gst_event_replace (&priv->last_seek, NULL);
+      g_list_free_full (priv->seeks,
+          (GDestroyNotify) gst_validate_seek_information_free);
+      priv->seeks = NULL;
       SCENARIO_UNLOCK (scenario);
 
       GST_DEBUG_OBJECT (scenario, "Got EOS; generate 'stop' action");
@@ -3488,7 +3684,34 @@ message_cb (GstBus * bus, GstMessage * message, GstValidateScenario * scenario)
         priv->dropped = dropped;
       break;
     }
-
+    case GST_MESSAGE_APPLICATION:
+    {
+      const GstStructure *s;
+      s = gst_message_get_structure (message);
+      if (gst_structure_has_name (s, "validate-segment")) {
+        GstValidateSinkInformation *sink_info;
+
+        SCENARIO_LOCK (scenario);
+        sink_info =
+            _find_sink_information (scenario,
+            (GstElement *) GST_MESSAGE_SRC (message));
+
+        if (sink_info) {
+          const GValue *segment_value;
+          const GstSegment *segment;
+
+          GST_DEBUG_OBJECT (scenario, "Got segment update for %s",
+              GST_ELEMENT_NAME (sink_info->sink));
+          sink_info->segment_seqnum = GST_MESSAGE_SEQNUM (message);
+          segment_value = gst_structure_get_value (s, "segment");
+          g_assert (segment_value != NULL);
+          segment = (const GstSegment *) g_value_get_boxed (segment_value);
+          gst_segment_copy_into (segment, &sink_info->segment);
+          _validate_sink_information (scenario);
+        }
+        SCENARIO_UNLOCK (scenario);
+      }
+    }
     default:
       break;
   }
@@ -3912,6 +4135,8 @@ gst_validate_scenario_init (GstValidateScenario * scenario)
   priv->seek_pos_tol = DEFAULT_SEEK_TOLERANCE;
   priv->segment_start = 0;
   priv->segment_stop = GST_CLOCK_TIME_NONE;
+  priv->current_seek = NULL;
+  priv->current_seqnum = GST_SEQNUM_INVALID;
   priv->action_execution_interval = 10;
   priv->vars = gst_structure_new_empty ("vars");
   priv->needs_playback_parsing = TRUE;
@@ -3928,8 +4153,6 @@ gst_validate_scenario_dispose (GObject * object)
 {
   GstValidateScenarioPrivate *priv = GST_VALIDATE_SCENARIO (object)->priv;
 
-  if (priv->last_seek)
-    gst_event_unref (priv->last_seek);
   g_weak_ref_clear (&priv->ref_pipeline);
 
   if (priv->bus) {
@@ -3970,6 +4193,8 @@ gst_validate_scenario_finalize (GObject * object)
 
 static void _element_added_cb (GstBin * bin, GstElement * element,
     GstValidateScenario * scenario);
+static void _element_removed_cb (GstBin * bin, GstElement * element,
+    GstValidateScenario * scenario);
 
 static void
 iterate_children (GstValidateScenario * scenario, GstBin * bin)
@@ -4013,6 +4238,42 @@ should_execute_action (GstElement * element, GstValidateAction * action)
   return gst_validate_element_matches_target (element, action->structure);
 }
 
+/* Returns TRUE if:
+ * * The element has no parent (pipeline)
+ * * Or it's a sink*/
+static gboolean
+_all_parents_are_sink (GstElement * element)
+{
+  if (GST_OBJECT_PARENT (element) == NULL)
+    return TRUE;
+
+  if (!GST_OBJECT_FLAG_IS_SET (element, GST_ELEMENT_FLAG_SINK))
+    return FALSE;
+
+  return _all_parents_are_sink ((GstElement *) GST_OBJECT_PARENT (element));
+}
+
+static void
+_element_removed_cb (GstBin * bin, GstElement * element,
+    GstValidateScenario * scenario)
+{
+  GstValidateScenarioPrivate *priv = scenario->priv;
+
+  if (GST_IS_BASE_SINK (element)) {
+    GstValidateSinkInformation *sink_info;
+    SCENARIO_LOCK (scenario);
+    sink_info = _find_sink_information (scenario, element);
+    if (sink_info) {
+      GST_DEBUG_OBJECT (scenario, "Removing sink information for %s",
+          GST_ELEMENT_NAME (element));
+      priv->sinks = g_list_remove (priv->sinks, sink_info);
+      gst_object_unref (sink_info->sink);
+      g_free (sink_info);
+    }
+    SCENARIO_UNLOCK (scenario);
+  }
+}
+
 static void
 _element_added_cb (GstBin * bin, GstElement * element,
     GstValidateScenario * scenario)
@@ -4049,6 +4310,17 @@ _element_added_cb (GstBin * bin, GstElement * element,
     } else
       tmp = tmp->next;
   }
+
+  /* If it's a new GstBaseSink, add to list of sink information */
+  if (GST_IS_BASE_SINK (element) && _all_parents_are_sink (element)) {
+    GstValidateSinkInformation *sink_info =
+        g_new0 (GstValidateSinkInformation, 1);
+    GST_DEBUG_OBJECT (scenario, "Adding %s to list of tracked sinks",
+        GST_ELEMENT_NAME (element));
+    sink_info->sink = gst_object_ref (element);
+    priv->sinks = g_list_append (priv->sinks, sink_info);
+  }
+
   SCENARIO_UNLOCK (scenario);
 
   _check_scenario_is_done (scenario);
@@ -4057,6 +4329,8 @@ _element_added_cb (GstBin * bin, GstElement * element,
   if (GST_IS_BIN (element)) {
     g_signal_connect (element, "element-added", (GCallback) _element_added_cb,
         scenario);
+    g_signal_connect (element, "element-removed",
+        (GCallback) _element_removed_cb, scenario);
     iterate_children (scenario, GST_BIN (element));
   }
 }
@@ -4111,6 +4385,8 @@ gst_validate_scenario_new (GstValidateRunner *
 
   g_signal_connect (pipeline, "element-added", (GCallback) _element_added_cb,
       scenario);
+  g_signal_connect (pipeline, "element-removed",
+      (GCallback) _element_removed_cb, scenario);
 
   iterate_children (scenario, GST_BIN (pipeline));