urisourcebin: Only expose pads once activation has completed
authorEdward Hervey <edward@centricular.com>
Thu, 27 Oct 2022 09:47:56 +0000 (11:47 +0200)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 16 Nov 2022 14:01:46 +0000 (14:01 +0000)
The following problem could happen:
* Thread 1 : urisourcebin gets activated from READY->PAUSED
* Thread 2 : some element causes a pad to be added to urisourcebin , which gets
linked downstream, which decides to activate upstream to pull-based.
  * That requires "activating" the pads from PUSH to NONE, and then from NONE to PULL
* Thread 1 : the base class state change handlers checks if all pads are
activated

The issue is that since going form PUSH to PULL requires going through NONE,
there is a window during which:
* Thread 1 : The pad was set to NONE (before being set to PULL)
* Thread 2 : The base class activates that pad (to PUSH)
* Thread 1 : The attempt to "activate" to PULL fails (silently or not)

This is very racy, so in order to avoid that, we make sure that we only add pads
once the transition from READY->PAUSED in the parent classes is done.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2784>

subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c

index cc71a58..91e2706 100644 (file)
@@ -149,6 +149,10 @@ struct _GstURISourceBin
   gchar *uri;
   guint64 connection_speed;
 
+  gboolean activated;           /* TRUE if the switch to PAUSED has been completed */
+  gboolean flushing;            /* TRUE if switching from PAUSED to READY */
+  GCond activation_cond;        /* Uses the urisourcebin lock */
+
   gboolean is_stream;
   gboolean is_adaptive;
   gboolean demuxer_handles_buffering;   /* If TRUE: Don't use buffering elements */
@@ -491,6 +495,8 @@ gst_uri_source_bin_init (GstURISourceBin * urisrc)
   g_mutex_init (&urisrc->buffering_lock);
   g_mutex_init (&urisrc->buffering_post_lock);
 
+  g_cond_init (&urisrc->activation_cond);
+
   urisrc->uri = g_strdup (DEFAULT_PROP_URI);
   urisrc->connection_speed = DEFAULT_CONNECTION_SPEED;
 
@@ -681,8 +687,8 @@ new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
     GST_DEBUG_OBJECT (element,
         "New streams-aware demuxer pad %s:%s , exposing directly",
         GST_DEBUG_PAD_NAME (pad));
-    expose_output_pad (urisrc, info->output_pad);
     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+    expose_output_pad (urisrc, info->output_pad);
   } else {
     GST_DEBUG_OBJECT (element, "new demuxer pad, name: <%s>. "
         "Added as pending pad with caps %" GST_PTR_FORMAT,
@@ -1315,6 +1321,32 @@ create_output_pad (GstURISourceBin * urisrc, GstPad * pad)
   return newpad;
 }
 
+static GstPadProbeReturn
+expose_block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  GstURISourceBin *urisrc = (GstURISourceBin *) user_data;
+  gboolean expose = FALSE;
+
+  GST_DEBUG_OBJECT (pad, "blocking");
+
+  GST_URI_SOURCE_BIN_LOCK (urisrc);
+  while (!urisrc->activated && !urisrc->flushing) {
+    GST_DEBUG_OBJECT (urisrc, "activated:%d flushing:%d", urisrc->activated,
+        urisrc->flushing);
+    g_cond_wait (&urisrc->activation_cond, &urisrc->lock);
+  }
+  GST_DEBUG_OBJECT (urisrc, "activated:%d flushing:%d", urisrc->activated,
+      urisrc->flushing);
+
+  if (!urisrc->flushing)
+    expose = TRUE;
+  GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+  if (expose)
+    gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
+  GST_DEBUG_OBJECT (pad, "Done blocking, removing probe");
+  return GST_PAD_PROBE_REMOVE;
+}
+
 static void
 expose_output_pad (GstURISourceBin * urisrc, GstPad * pad)
 {
@@ -1328,10 +1360,20 @@ expose_output_pad (GstURISourceBin * urisrc, GstPad * pad)
   gst_pad_sticky_events_foreach (target, copy_sticky_events, pad);
   gst_object_unref (target);
 
-  GST_DEBUG_OBJECT (urisrc, "Exposing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
-
   gst_pad_set_active (pad, TRUE);
-  gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
+  GST_URI_SOURCE_BIN_LOCK (urisrc);
+  if (!urisrc->activated) {
+    GST_DEBUG_OBJECT (urisrc, "Not fully activated, adding pad once PAUSED !");
+    gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
+        expose_block_probe, urisrc, NULL);
+    pad = NULL;
+  }
+  GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+
+  if (pad) {
+    GST_DEBUG_OBJECT (urisrc, "Exposing pad %" GST_PTR_FORMAT, pad);
+    gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
+  }
 }
 
 static void
@@ -1984,8 +2026,8 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
     /* We don't need slot here, expose immediately */
     GST_URI_SOURCE_BIN_LOCK (urisrc);
     output_pad = create_output_pad (urisrc, srcpad);
-    expose_raw_output_pad (urisrc, srcpad, output_pad);
     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+    expose_raw_output_pad (urisrc, srcpad, output_pad);
   } else {
     OutputSlotInfo *slot;
     GstPad *output_pad;
@@ -2896,10 +2938,19 @@ gst_uri_source_bin_change_state (GstElement * element,
 
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
+      GST_URI_SOURCE_BIN_LOCK (element);
+      urisrc->flushing = FALSE;
+      urisrc->activated = FALSE;
+      GST_URI_SOURCE_BIN_UNLOCK (element);
       GST_DEBUG ("ready to paused");
       if (!setup_source (urisrc))
         goto source_failed;
       break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      GST_URI_SOURCE_BIN_LOCK (element);
+      urisrc->flushing = TRUE;
+      g_cond_broadcast (&urisrc->activation_cond);
+      GST_URI_SOURCE_BIN_UNLOCK (element);
     default:
       break;
   }
@@ -2910,6 +2961,13 @@ gst_uri_source_bin_change_state (GstElement * element,
 
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
+    {
+      GST_URI_SOURCE_BIN_LOCK (element);
+      GST_DEBUG_OBJECT (urisrc, "Potentially exposing pads");
+      urisrc->activated = TRUE;
+      g_cond_broadcast (&urisrc->activation_cond);
+      GST_URI_SOURCE_BIN_UNLOCK (element);
+    }
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       GST_DEBUG ("paused to ready");