urisourcebin: Handle legacy pad replacements from parsebin
authorEdward Hervey <edward@centricular.com>
Fri, 23 Feb 2024 10:15:49 +0000 (11:15 +0100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 23 Feb 2024 16:05:44 +0000 (16:05 +0000)
When dealing with demuxers which aren't streams-aware, we need to handle the
old-school "stream replacement" dance from `parsebin` and hide that in such a
way that output pads are re-used (if compatible).

By analyzing the collection posted by parsebin, we can:
* Identify whether some output slots are no longer used (because the stream they
  currently handle is not present in the collection)
* Decide if some upcoming streams could re-use the existing slot

This supports both buffering and non-buffering modes.

Fixes #1651

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6201>

subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c

index 559c5310d74771cc6efdb5688442f8721cb756da..b70a72c3f9c098653a6e323fa3289f7831a6448c 100644 (file)
@@ -132,7 +132,11 @@ struct _OutputSlotInfo
 {
   ChildSrcPadInfo *linked_info; /* source pad info feeding this slot */
 
+  GstStream *stream;            /* The current stream */
+  GstStream *pending_stream;    /* The stream this slot should switch to */
+
   GstPad *originating_pad;      /* Pad that created this OutputSlotInfo (ref held) */
+  GstPad *pending_pad;          /* Pad this slot should use once originating_pad goes away (ref held) */
   GstPad *output_pad;           /* Output ghost pad */
 
   gboolean is_eos;              /* Did EOS get fed into the buffering element */
@@ -143,6 +147,7 @@ struct _OutputSlotInfo
   gulong bitrate_changed_id;    /* queue bitrate changed notification */
 
   guint demuxer_event_probe_id;
+  guint pending_probe_id;       /* demuxer_event_probe_id for pending_pad */
 };
 
 /**
@@ -796,13 +801,28 @@ new_child_src_pad_info (GstURISourceBin * urisrc, GstPad * pad)
   return info;
 }
 
+static OutputSlotInfo *
+find_replacement_slot (ChildSrcPadInfo * info, GstStream * stream)
+{
+  GList *iter;
+
+  for (iter = info->outputs; iter; iter = iter->next) {
+    OutputSlotInfo *slot = iter->data;
+
+    if (slot->pending_stream == stream)
+      return slot;
+  }
+
+  return NULL;
+}
+
 /* Called by the signal handlers when a demuxer has produced a new stream */
 static void
 new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
     ChildSrcPadInfo * info)
 {
   GstURISourceBin *urisrc = info->urisrc;
-  OutputSlotInfo *slot;
+  OutputSlotInfo *slot = NULL;
   GstPad *output_pad;
 
   GST_DEBUG_OBJECT (element, "New pad %" GST_PTR_FORMAT, pad);
@@ -816,6 +836,31 @@ new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
         ("Adaptive demuxer is not streams-aware, check your installation"));
 
   }
+
+  /* For parsebin source pads we want to check if this is a replacement pad for
+   * which we want to re-use an existing OutputSlotInfo */
+  if (info->demuxer_is_parsebin) {
+    GstStream *stream = gst_pad_get_stream (pad);
+
+    if (stream) {
+      slot = find_replacement_slot (info, stream);
+      if (slot) {
+        GST_DEBUG_OBJECT (pad, "Can re-use slot %s:%s",
+            GST_DEBUG_PAD_NAME (slot->originating_pad));
+        slot->pending_pad = gst_object_ref (pad);
+        slot->pending_probe_id =
+            gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
+            GST_PAD_PROBE_TYPE_EVENT_FLUSH,
+            (GstPadProbeCallback) demux_pad_events, slot, NULL);
+        GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+        return;
+      }
+      GST_DEBUG_OBJECT (pad, "No existing output slot to re-use");
+    } else {
+      GST_WARNING_OBJECT (pad, "No GstStream on pad ??");
+    }
+  }
+
   /* If the demuxer handles buffering and is streams-aware, we can expose it
      as-is directly. We still add an event probe to deal with EOS */
   slot = new_output_slot (info, pad);
@@ -880,6 +925,11 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot)
 
       GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
 
+      if (slot->pending_pad && pad != slot->pending_pad) {
+        GST_DEBUG_OBJECT (pad, "A pending pad is present, ignoring");
+        break;
+      }
+
       BUFFERING_LOCK (urisrc);
       /* Mark that we fed an EOS to this slot */
       slot->is_eos = TRUE;
@@ -1266,6 +1316,8 @@ new_output_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
     slot->output_pad = create_output_pad (slot, originating_pad);
   }
   slot->originating_pad = gst_object_ref (originating_pad);
+  /* Store stream if present */
+  slot->stream = gst_pad_get_stream (originating_pad);
 
   /* save output slot so we can remove it later */
   info->outputs = g_list_append (info->outputs, slot);
@@ -1273,6 +1325,8 @@ new_output_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
   GST_DEBUG_OBJECT (urisrc,
       "New output_pad %" GST_PTR_FORMAT " for originating pad %" GST_PTR_FORMAT,
       slot->output_pad, originating_pad);
+  if (slot->stream)
+    GST_DEBUG_OBJECT (urisrc, "  and stream %" GST_PTR_FORMAT, slot->stream);
 
   return slot;
 
@@ -1436,6 +1490,31 @@ demuxer_pad_removed_cb (GstElement * element, GstPad * pad,
   gst_pad_remove_probe (pad, slot->demuxer_event_probe_id);
   slot->demuxer_event_probe_id = 0;
 
+  if (slot->pending_pad) {
+    /* Switch over to pending pad */
+    GST_DEBUG_OBJECT (urisrc, "Switching to pending pad <%s:%s>",
+        GST_DEBUG_PAD_NAME (slot->pending_pad));
+    slot->demuxer_event_probe_id = slot->pending_probe_id;
+    slot->pending_probe_id = 0;
+
+    gst_object_unref (slot->originating_pad);
+    slot->originating_pad = slot->pending_pad;
+    slot->pending_pad = NULL;
+
+    gst_object_unref (slot->stream);
+    slot->stream = slot->pending_stream;
+    slot->pending_stream = NULL;
+
+    if (slot->queue_sinkpad) {
+      gst_pad_link (slot->originating_pad, slot->queue_sinkpad);
+    } else {
+      gst_ghost_pad_set_target ((GstGhostPad *) slot->output_pad,
+          slot->originating_pad);
+    }
+    GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+    return;
+  }
+
   if (slot->queue) {
     gboolean was_eos;
 
@@ -2276,6 +2355,15 @@ free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
 
   if (slot->demuxer_event_probe_id)
     gst_pad_remove_probe (slot->originating_pad, slot->demuxer_event_probe_id);
+  if (slot->pending_pad) {
+    if (slot->pending_probe_id)
+      gst_pad_remove_probe (slot->pending_pad, slot->pending_probe_id);
+    gst_object_unref (slot->pending_pad);
+  }
+  if (slot->stream)
+    gst_object_unref (slot->stream);
+  if (slot->pending_stream)
+    gst_object_unref (slot->pending_stream);
 
   gst_object_unref (slot->originating_pad);
   /* deactivate and remove the srcpad */
@@ -2725,6 +2813,65 @@ find_adaptive_demuxer_cspi_for_msg (GstURISourceBin * urisrc,
   return res;
 }
 
+static GstStream *
+find_compatible_stream (GList * streams, GstStream * stream)
+{
+  GList *iter;
+  GstStreamType stream_type = gst_stream_get_stream_type (stream);
+
+  for (iter = streams; iter; iter = iter->next) {
+    GstStream *candidate = iter->data;
+
+    if (gst_stream_get_stream_type (candidate) == stream_type)
+      return candidate;
+  }
+
+  return NULL;
+}
+
+static void
+handle_parsebin_collection (ChildSrcPadInfo * info,
+    GstStreamCollection * collection)
+{
+  GList *unused_slots = NULL, *iter;
+  GList *streams = NULL;
+  guint i, nb_streams;
+
+  nb_streams = gst_stream_collection_get_size (collection);
+  for (i = 0; i < nb_streams; i++)
+    streams =
+        g_list_append (streams, gst_stream_collection_get_stream (collection,
+            i));
+
+  /* Get list of output info slots not present in the collection */
+  for (iter = info->outputs; iter; iter = iter->next) {
+    OutputSlotInfo *output = iter->data;
+
+    if (output->stream && !g_list_find (streams, output->stream)) {
+      GST_DEBUG_OBJECT (output->originating_pad,
+          "No longer used in new collection");
+      unused_slots = g_list_append (unused_slots, output);
+    }
+  }
+
+  /* For each of those slots, check if there is a compatible stream from the
+   * collection that could be assigned to it */
+  for (iter = unused_slots; iter; iter = iter->next) {
+    OutputSlotInfo *output = iter->data;
+    GstStream *replacement = find_compatible_stream (streams, output->stream);
+    if (replacement) {
+      GST_DEBUG_OBJECT (output->originating_pad, "Assigning stream %s",
+          gst_stream_get_stream_id (replacement));
+      output->pending_stream = gst_object_ref (replacement);
+      streams = g_list_remove (streams, replacement);
+    }
+  }
+
+  g_list_free (unused_slots);
+  g_list_free (streams);
+}
+
+
 static void
 handle_message (GstBin * bin, GstMessage * msg)
 {
@@ -2755,7 +2902,14 @@ handle_message (GstBin * bin, GstMessage * msg)
       if (info) {
         info->demuxer_streams_aware = TRUE;
         if (info->demuxer_is_parsebin) {
+          GstStreamCollection *collection = NULL;
+          gst_message_parse_stream_collection (msg, &collection);
           GST_DEBUG_OBJECT (bin, "Dropping stream-collection from parsebin");
+          /* Check if some output slots can/could be re-used with this new collection */
+          if (collection) {
+            handle_parsebin_collection (info, collection);
+            gst_object_unref (collection);
+          }
           gst_message_unref (msg);
           msg = NULL;
         }