bin: Fix race conditions in tests
authorSebastian Dröge <sebastian@centricular.com>
Tue, 21 Jun 2022 08:51:35 +0000 (11:51 +0300)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Tue, 21 Jun 2022 17:27:28 +0000 (17:27 +0000)
The latency messages are non-deterministic and can arrive before/after
async-done or during state-changes as they are posted by e.g. sinks from
their streaming thread but bins are finishing asynchronous state changes
from a secondary helper thread.

To solve this, expect latency messages at any time and assert that we
receive one at some point during the test.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2643>

subprojects/gstreamer/tests/check/gst/gstbin.c

index e366d5f..88ff44d 100644 (file)
 #include <gst/base/gstbasesrc.h>
 
 static void
-pop_async_done (GstBus * bus)
+pop_async_done (GstBus * bus, gboolean * had_latency)
 {
   GstMessage *message;
+  GstMessageType types = GST_MESSAGE_ASYNC_DONE;
+
+  if (!*had_latency)
+    types |= GST_MESSAGE_LATENCY;
 
   GST_DEBUG ("popping async-done message");
-  message = gst_bus_poll (bus, GST_MESSAGE_ASYNC_DONE, -1);
 
-  fail_unless (message && GST_MESSAGE_TYPE (message)
-      == GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE");
+  do {
+    message = gst_bus_poll (bus, types, -1);
 
-  gst_message_unref (message);
-  GST_DEBUG ("popped message");
+    fail_unless (message);
+    GST_DEBUG ("popped message %s",
+        gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
+
+    if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_LATENCY) {
+      fail_unless (*had_latency == FALSE);
+      *had_latency = TRUE;
+      gst_clear_message (&message);
+      types &= ~GST_MESSAGE_LATENCY;
+      continue;
+    }
+
+    fail_unless (GST_MESSAGE_TYPE (message)
+        == GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE");
+
+    gst_clear_message (&message);
+    break;
+  } while (TRUE);
 }
 
 static void
-pop_latency (GstBus * bus)
+pop_latency (GstBus * bus, gboolean * had_latency)
 {
   GstMessage *message;
 
-  GST_DEBUG ("popping async-done message");
+  if (*had_latency)
+    return;
+
+  GST_DEBUG ("popping latency message");
   message = gst_bus_poll (bus, GST_MESSAGE_LATENCY, -1);
 
-  fail_unless (message && GST_MESSAGE_TYPE (message)
+  fail_unless (message);
+  fail_unless (GST_MESSAGE_TYPE (message)
       == GST_MESSAGE_LATENCY, "did not get GST_MESSAGE_LATENCY");
 
-  gst_message_unref (message);
-  GST_DEBUG ("popped message");
+  GST_DEBUG ("popped message %s",
+      gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
+  gst_clear_message (&message);
+
+  *had_latency = TRUE;
 }
 
 static void
-pop_state_changed (GstBus * bus, int count)
+pop_state_changed (GstBus * bus, int count, gboolean * had_latency)
 {
   GstMessage *message;
-
+  GstMessageType types = GST_MESSAGE_STATE_CHANGED;
   int i;
 
+  if (!*had_latency)
+    types |= GST_MESSAGE_LATENCY;
+
   GST_DEBUG ("popping %d messages", count);
   for (i = 0; i < count; ++i) {
-    message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
-
-    fail_unless (message && GST_MESSAGE_TYPE (message)
-        == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
-
-    gst_message_unref (message);
+    do {
+      message = gst_bus_poll (bus, types, -1);
+
+      fail_unless (message);
+      GST_DEBUG ("popped message %s",
+          gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
+
+      if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_LATENCY) {
+        fail_unless (*had_latency == FALSE);
+        *had_latency = TRUE;
+        gst_clear_message (&message);
+        types &= ~GST_MESSAGE_LATENCY;
+        continue;
+      }
+
+      fail_unless (GST_MESSAGE_TYPE (message)
+          == GST_MESSAGE_STATE_CHANGED,
+          "did not get GST_MESSAGE_STATE_CHANGED");
+
+      gst_message_unref (message);
+      break;
+    } while (TRUE);
   }
   GST_DEBUG ("popped %d messages", count);
 }
@@ -538,6 +583,7 @@ GST_START_TEST (test_message_state_changed_children)
   GstBus *bus;
   GstStateChangeReturn ret;
   GstState current, pending;
+  gboolean had_latency = FALSE;
 
   pipeline = GST_PIPELINE (gst_pipeline_new (NULL));
   fail_unless (pipeline != NULL, "Could not create pipeline");
@@ -576,7 +622,7 @@ GST_START_TEST (test_message_state_changed_children)
   ASSERT_OBJECT_REFCOUNT (sink, "sink", 2);
   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 2);
 
-  pop_state_changed (bus, 3);
+  pop_state_changed (bus, 3, &had_latency);
   fail_if (gst_bus_have_pending (bus), "unexpected pending messages");
 
   ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@@ -619,9 +665,9 @@ GST_START_TEST (test_message_state_changed_children)
    * its state_change message */
   ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 3, 4);
 
-  pop_state_changed (bus, 3);
-  pop_async_done (bus);
-  pop_latency (bus);
+  pop_state_changed (bus, 3, &had_latency);
+  pop_async_done (bus, &had_latency);
+  pop_latency (bus, &had_latency);
   fail_if ((gst_bus_pop (bus)) != NULL);
 
   ASSERT_OBJECT_REFCOUNT_BETWEEN (bus, "bus", 2, 3);
@@ -648,7 +694,7 @@ GST_START_TEST (test_message_state_changed_children)
   ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 4);
   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
 
-  pop_state_changed (bus, 3);
+  pop_state_changed (bus, 3, &had_latency);
   fail_if ((gst_bus_pop (bus)) != NULL);
 
   ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@@ -669,7 +715,7 @@ GST_START_TEST (test_message_state_changed_children)
   ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 3, 4);
   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
 
-  pop_state_changed (bus, 6);
+  pop_state_changed (bus, 6, &had_latency);
   fail_if ((gst_bus_pop (bus)) != NULL);
 
   ASSERT_OBJECT_REFCOUNT (src, "src", 1);
@@ -696,6 +742,7 @@ GST_START_TEST (test_watch_for_state_change)
   GstElement *src, *sink, *bin;
   GstBus *bus;
   GstStateChangeReturn ret;
+  gboolean had_latency = FALSE;
 
   bin = gst_element_factory_make ("bin", NULL);
   fail_unless (bin != NULL, "Could not create bin");
@@ -722,9 +769,9 @@ GST_START_TEST (test_watch_for_state_change)
       GST_CLOCK_TIME_NONE);
   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
 
-  pop_state_changed (bus, 6);
-  pop_async_done (bus);
-  pop_latency (bus);
+  pop_state_changed (bus, 6, &had_latency);
+  pop_async_done (bus, &had_latency);
+  pop_latency (bus, &had_latency);
 
   fail_unless (gst_bus_have_pending (bus) == FALSE,
       "Unexpected messages on bus");
@@ -732,16 +779,17 @@ GST_START_TEST (test_watch_for_state_change)
   ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING);
   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
 
-  pop_state_changed (bus, 3);
+  pop_state_changed (bus, 3, &had_latency);
 
+  had_latency = FALSE;
   /* this one might return either SUCCESS or ASYNC, likely SUCCESS */
   ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
   gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE);
 
-  pop_state_changed (bus, 3);
+  pop_state_changed (bus, 3, &had_latency);
   if (ret == GST_STATE_CHANGE_ASYNC) {
-    pop_async_done (bus);
-    pop_latency (bus);
+    pop_async_done (bus, &had_latency);
+    pop_latency (bus, &had_latency);
   }
 
   fail_unless (gst_bus_have_pending (bus) == FALSE,
@@ -898,6 +946,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
   GstStateChangeReturn ret;
   GstState current, pending;
   GstBus *bus;
+  gboolean had_latency = FALSE;
 
   pipeline = gst_pipeline_new (NULL);
   fail_unless (pipeline != NULL, "Could not create pipeline");
@@ -951,10 +1000,11 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107);
 #else
 
-  pop_state_changed (bus, 2);   /* pop remaining ready => paused messages off the bus */
+  pop_state_changed (bus, 2, &had_latency);     /* pop remaining ready => paused messages off the bus */
   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
       108);
-  pop_async_done (bus);
+  pop_async_done (bus, &had_latency);
+  pop_latency (bus, &had_latency);
 #endif
   /* PAUSED => PLAYING */
   GST_DEBUG ("popping PAUSED -> PLAYING messages");
@@ -972,8 +1022,8 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
   fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
 
   /* TODO: do we need to check downwards state change order as well? */
-  pop_state_changed (bus, 4);   /* pop playing => paused messages off the bus */
-  pop_state_changed (bus, 4);   /* pop paused => ready messages off the bus */
+  pop_state_changed (bus, 4, &had_latency);     /* pop playing => paused messages off the bus */
+  pop_state_changed (bus, 4, &had_latency);     /* pop paused => ready messages off the bus */
 
   while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)
     THREAD_SWITCH ();
@@ -1002,6 +1052,7 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
   GstStateChangeReturn ret;
   GstState current, pending;
   GstBus *bus;
+  gboolean had_latency = FALSE;
 
   /* (2) Now again, but check other code path where we don't have
    *     a proper sink correctly flagged as such, but a 'semi-sink' */
@@ -1056,10 +1107,11 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
   ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206);
   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207);
 #else
-  pop_state_changed (bus, 2);   /* pop remaining ready => paused messages off the bus */
+  pop_state_changed (bus, 2, &had_latency);     /* pop remaining ready => paused messages off the bus */
   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
       208);
-  pop_async_done (bus);
+  pop_async_done (bus, &had_latency);
+  pop_latency (bus, &had_latency);
 
   /* PAUSED => PLAYING */
   GST_DEBUG ("popping PAUSED -> PLAYING messages");
@@ -1076,8 +1128,8 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
   fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
 
   /* TODO: do we need to check downwards state change order as well? */
-  pop_state_changed (bus, 4);   /* pop playing => paused messages off the bus */
-  pop_state_changed (bus, 4);   /* pop paused => ready messages off the bus */
+  pop_state_changed (bus, 4, &had_latency);     /* pop playing => paused messages off the bus */
+  pop_state_changed (bus, 4, &had_latency);     /* pop paused => ready messages off the bus */
 
   GST_DEBUG ("waiting for pipeline to reach refcount 1");
   while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)