basesink: Post a latency message whenever we're ready to answer the query
authorSebastian Dröge <sebastian@centricular.com>
Thu, 24 Jun 2021 08:28:28 +0000 (11:28 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Mon, 28 Jun 2021 16:59:47 +0000 (19:59 +0300)
Usually the latency message is only posted whenever latency of an
element changes but that might be too early as the sinks might not be
able to query the latency at that point yet.

Similarly adding a new sink should cause latency reconfiguration once
that new sink is able to report its latency.

This fixes latency configuration in pipelines where webrtcbin is the
only "sink", i.e. it is used in a sendonly session. Before, the latency
would always be configured to 0.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/843>

libs/gst/base/gstbasesink.c
tests/check/gst/gstbin.c
tests/check/pipelines/cleanup.c
tests/check/pipelines/simple-launch-lines.c

index c94bbac..36fec5f 100644 (file)
@@ -1769,6 +1769,9 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
             next, pending, GST_STATE_VOID_PENDING));
   }
 
+  gst_element_post_message (GST_ELEMENT_CAST (basesink),
+      gst_message_new_latency (GST_OBJECT_CAST (basesink)));
+
   GST_STATE_BROADCAST (basesink);
 
   return TRUE;
@@ -1797,6 +1800,9 @@ nothing_pending:
     /* we can report latency queries now */
     basesink->priv->have_latency = TRUE;
     GST_OBJECT_UNLOCK (basesink);
+
+    gst_element_post_message (GST_ELEMENT_CAST (basesink),
+        gst_message_new_latency (GST_OBJECT_CAST (basesink)));
     return TRUE;
   }
 stopping_unlocked:
@@ -5693,6 +5699,8 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
             gst_message_new_async_start (GST_OBJECT_CAST (basesink)));
       } else {
         priv->have_latency = TRUE;
+        gst_element_post_message (GST_ELEMENT_CAST (basesink),
+            gst_message_new_latency (GST_OBJECT_CAST (basesink)));
       }
       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
       break;
index 1a88426..e366d5f 100644 (file)
@@ -42,7 +42,22 @@ pop_async_done (GstBus * bus)
 }
 
 static void
-pop_messages (GstBus * bus, int count)
+pop_latency (GstBus * bus)
+{
+  GstMessage *message;
+
+  GST_DEBUG ("popping async-done message");
+  message = gst_bus_poll (bus, GST_MESSAGE_LATENCY, -1);
+
+  fail_unless (message && GST_MESSAGE_TYPE (message)
+      == GST_MESSAGE_LATENCY, "did not get GST_MESSAGE_LATENCY");
+
+  gst_message_unref (message);
+  GST_DEBUG ("popped message");
+}
+
+static void
+pop_state_changed (GstBus * bus, int count)
 {
   GstMessage *message;
 
@@ -561,7 +576,7 @@ GST_START_TEST (test_message_state_changed_children)
   ASSERT_OBJECT_REFCOUNT (sink, "sink", 2);
   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 2);
 
-  pop_messages (bus, 3);
+  pop_state_changed (bus, 3);
   fail_if (gst_bus_have_pending (bus), "unexpected pending messages");
 
   ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@@ -599,18 +614,19 @@ GST_START_TEST (test_message_state_changed_children)
   ASSERT_OBJECT_REFCOUNT (src, "src", 4);
   /* refcount can be 4 if the bin is still processing the async_done message of
    * the sink. */
-  ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 3);
+  ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 4);
   /* 3 or 4 is valid, because the pipeline might still be posting 
    * its state_change message */
   ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 3, 4);
 
-  pop_messages (bus, 3);
+  pop_state_changed (bus, 3);
   pop_async_done (bus);
+  pop_latency (bus);
   fail_if ((gst_bus_pop (bus)) != NULL);
 
-  ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
+  ASSERT_OBJECT_REFCOUNT_BETWEEN (bus, "bus", 2, 3);
   ASSERT_OBJECT_REFCOUNT (src, "src", 1);
-  ASSERT_OBJECT_REFCOUNT (sink, "sink", 2);
+  ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 3);
   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 1);
 
   /* change state to PLAYING, spawning three messages */
@@ -632,7 +648,7 @@ GST_START_TEST (test_message_state_changed_children)
   ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 4);
   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
 
-  pop_messages (bus, 3);
+  pop_state_changed (bus, 3);
   fail_if ((gst_bus_pop (bus)) != NULL);
 
   ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@@ -650,10 +666,10 @@ GST_START_TEST (test_message_state_changed_children)
   /* each object is referenced by two messages, the source also has the
    * stream-status message referencing it */
   ASSERT_OBJECT_REFCOUNT (src, "src", 4);
-  ASSERT_OBJECT_REFCOUNT (sink, "sink", 3);
+  ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 3, 4);
   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
 
-  pop_messages (bus, 6);
+  pop_state_changed (bus, 6);
   fail_if ((gst_bus_pop (bus)) != NULL);
 
   ASSERT_OBJECT_REFCOUNT (src, "src", 1);
@@ -706,8 +722,9 @@ GST_START_TEST (test_watch_for_state_change)
       GST_CLOCK_TIME_NONE);
   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
 
-  pop_messages (bus, 6);
+  pop_state_changed (bus, 6);
   pop_async_done (bus);
+  pop_latency (bus);
 
   fail_unless (gst_bus_have_pending (bus) == FALSE,
       "Unexpected messages on bus");
@@ -715,15 +732,17 @@ GST_START_TEST (test_watch_for_state_change)
   ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING);
   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
 
-  pop_messages (bus, 3);
+  pop_state_changed (bus, 3);
 
   /* this one might return either SUCCESS or ASYNC, likely SUCCESS */
   ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
   gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE);
 
-  pop_messages (bus, 3);
-  if (ret == GST_STATE_CHANGE_ASYNC)
+  pop_state_changed (bus, 3);
+  if (ret == GST_STATE_CHANGE_ASYNC) {
     pop_async_done (bus);
+    pop_latency (bus);
+  }
 
   fail_unless (gst_bus_have_pending (bus) == FALSE,
       "Unexpected messages on bus");
@@ -932,7 +951,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107);
 #else
 
-  pop_messages (bus, 2);        /* pop remaining ready => paused messages off the bus */
+  pop_state_changed (bus, 2);   /* pop remaining ready => paused messages off the bus */
   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
       108);
   pop_async_done (bus);
@@ -953,8 +972,8 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
   fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
 
   /* TODO: do we need to check downwards state change order as well? */
-  pop_messages (bus, 4);        /* pop playing => paused messages off the bus */
-  pop_messages (bus, 4);        /* pop paused => ready messages off the bus */
+  pop_state_changed (bus, 4);   /* pop playing => paused messages off the bus */
+  pop_state_changed (bus, 4);   /* pop paused => ready messages off the bus */
 
   while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)
     THREAD_SWITCH ();
@@ -1037,7 +1056,7 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
   ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206);
   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207);
 #else
-  pop_messages (bus, 2);        /* pop remaining ready => paused messages off the bus */
+  pop_state_changed (bus, 2);   /* pop remaining ready => paused messages off the bus */
   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
       208);
   pop_async_done (bus);
@@ -1057,8 +1076,8 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
   fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
 
   /* TODO: do we need to check downwards state change order as well? */
-  pop_messages (bus, 4);        /* pop playing => paused messages off the bus */
-  pop_messages (bus, 4);        /* pop paused => ready messages off the bus */
+  pop_state_changed (bus, 4);   /* pop playing => paused messages off the bus */
+  pop_state_changed (bus, 4);   /* pop paused => ready messages off the bus */
 
   GST_DEBUG ("waiting for pipeline to reach refcount 1");
   while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)
index 5ccb50b..4bb5e43 100644 (file)
@@ -92,7 +92,7 @@ GST_START_TEST (test_pipeline_unref)
   run_pipeline (pipeline, s,
       GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
       GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
-      GST_MESSAGE_STREAM_START, GST_MESSAGE_EOS);
+      GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_EOS);
   while (GST_OBJECT_REFCOUNT_VALUE (src) > 1)
     THREAD_SWITCH ();
   ASSERT_OBJECT_REFCOUNT (src, "src", 1);
index 62385b2..cf03db3 100644 (file)
@@ -103,31 +103,31 @@ GST_START_TEST (test_2_elements)
   run_pipeline (setup_pipeline (s), s,
       GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
       GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
-      GST_MESSAGE_STREAM_START, GST_MESSAGE_UNKNOWN);
+      GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_UNKNOWN);
 
   s = "fakesrc can-activate-push=true ! fakesink can-activate-pull=false";
   run_pipeline (setup_pipeline (s), s,
       GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
       GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
-      GST_MESSAGE_STREAM_START, GST_MESSAGE_UNKNOWN);
+      GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_UNKNOWN);
 
   s = "fakesrc can-activate-push=false num-buffers=10 ! fakesink can-activate-pull=true";
   run_pipeline (setup_pipeline (s), s,
       GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
       GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
-      GST_MESSAGE_STREAM_START, GST_MESSAGE_EOS);
+      GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_EOS);
 
   s = "fakesrc can-activate-push=true num-buffers=10 ! fakesink can-activate-pull=false";
   run_pipeline (setup_pipeline (s), s,
       GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
       GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
-      GST_MESSAGE_STREAM_START, GST_MESSAGE_EOS);
+      GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_EOS);
 
   s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=false";
   ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s,
           GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
           GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
-          GST_MESSAGE_STREAM_START, GST_MESSAGE_UNKNOWN));
+          GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_UNKNOWN));
 }
 
 GST_END_TEST;