decodebin3: Make update/posting of collection messages atomic
authorEdward Hervey <edward@centricular.com>
Tue, 1 Oct 2024 15:17:01 +0000 (17:17 +0200)
committerBackport Bot <gitlab-backport-bot@gstreamer-foundation.org>
Wed, 2 Oct 2024 22:06:07 +0000 (23:06 +0100)
The presence (or not) of a collection on an input will determine whether events
will be throttled so that there are only forwarded when that input gets a valid
collection.

Therefore the input lock should be used.

In addition to that, we want to ensure that the application/user has a chance to
reliably (i.e. synchronously) specify what streams it is interested in by
sending a GST_EVENT_SELECT_STREAMS.

But we cannot allow anything to go forward until that message posting has come
back, otherwise we run in various races.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/3872

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7602>

subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c

index 65c2c970e7fc0b689920de8d7f80f9a0db59b05a..d52c7ac744f782b73f4a48cbe600e5696993a1f0 100644 (file)
@@ -289,6 +289,10 @@ struct _GstDecodebin3
   guint32 input_counter;
   /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
   guint32 current_group_id;
+  /* Whether decodebin is currently posting a stream collection on the bus */
+  gboolean posting_collection;
+  /* GCond to block against and know when posting_collection changed */
+  GCond posting_cond;
   /* End of variables protected by input_lock */
 
   GstElement *multiqueue;
@@ -741,6 +745,8 @@ gst_decodebin3_init (GstDecodebin3 * dbin)
   g_mutex_init (&dbin->factories_lock);
   g_mutex_init (&dbin->selection_lock);
   g_mutex_init (&dbin->input_lock);
+  g_cond_init (&dbin->posting_cond);
+  dbin->posting_collection = FALSE;
 
   dbin->caps = gst_static_caps_get (&default_raw_caps);
 
@@ -831,6 +837,7 @@ gst_decodebin3_finalize (GObject * object)
   g_mutex_clear (&dbin->factories_lock);
   g_mutex_clear (&dbin->selection_lock);
   g_mutex_clear (&dbin->input_lock);
+  g_cond_clear (&dbin->posting_cond);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -2111,13 +2118,29 @@ sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
       gst_event_parse_stream_collection (event, &collection);
       if (collection) {
         GstMessage *collection_msg;
+
+        /* If we post a collection, we need to release the input lock, but we
+         * want to ensure code that needs to check of validity of the input
+         * collection to wait until the message was properly posted so that any
+         * synchronous handler can properly set their requested stream
+         * selection. */
         INPUT_LOCK (dbin);
+        while (dbin->posting_collection)
+          g_cond_wait (&dbin->posting_cond, &dbin->input_lock);
         collection_msg =
             handle_stream_collection_locked (dbin, collection, input);
         gst_object_unref (collection);
-        INPUT_UNLOCK (dbin);
-        if (collection_msg)
+        if (collection_msg) {
+          GST_DEBUG_OBJECT (sinkpad, "Posting collection");
+          dbin->posting_collection = TRUE;
+          INPUT_UNLOCK (dbin);
           gst_element_post_message (GST_ELEMENT_CAST (dbin), collection_msg);
+          INPUT_LOCK (dbin);
+          dbin->posting_collection = FALSE;
+          GST_DEBUG_OBJECT (sinkpad, "Done posting collection");
+          g_cond_broadcast (&dbin->posting_cond);
+        }
+        INPUT_UNLOCK (dbin);
       }
 
       /* If we are waiting to create an identity passthrough, do it now */
@@ -2132,6 +2155,7 @@ sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
 
       /* Drain all pending events */
       if (input->events_waiting_for_collection) {
+        GST_DEBUG_OBJECT (sinkpad, "Draining pending events");
         GList *tmp;
         for (tmp = input->events_waiting_for_collection; tmp; tmp = tmp->next)
           gst_pad_event_default (sinkpad, GST_OBJECT (dbin), tmp->data);
@@ -2215,12 +2239,21 @@ sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
 
   /* For parsed inputs, if we are waiting for a collection event, store them for
    * now */
-  if (!input->collection && input->input_is_parsed) {
-    GST_DEBUG_OBJECT (sinkpad,
-        "Postponing event until we get a stream collection");
-    input->events_waiting_for_collection =
-        g_list_append (input->events_waiting_for_collection, event);
-    return TRUE;
+  if (input->input_is_parsed) {
+    gboolean have_collection;
+    INPUT_LOCK (dbin);
+    while (dbin->posting_collection) {
+      g_cond_wait (&dbin->posting_cond, &dbin->input_lock);
+    }
+    have_collection = (input->collection != NULL);
+    INPUT_UNLOCK (dbin);
+    if (!have_collection) {
+      GST_DEBUG_OBJECT (sinkpad,
+          "Postponing event until we get a stream collection");
+      input->events_waiting_for_collection =
+          g_list_append (input->events_waiting_for_collection, event);
+      return TRUE;
+    }
   }
 
   /* Chain to parent function */