urisourcebin: Improve buffering handling
authorEdward Hervey <edward@centricular.com>
Wed, 9 Nov 2022 15:44:18 +0000 (16:44 +0100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 16 Nov 2022 14:01:46 +0000 (14:01 +0000)
Introduce the option to have the streams be parsed with `parsebin` for
compatible sources (i.e. which are eligible for buffering in the same way as
before this commit).

By parsing the inputs directly, this allows more accurate buffering control:
* Instead of relying on potential bitrate information coming from somewhere
* and *without* being linked downstream

If `parse-streams` is activated and the stream is eligible for buffering, then a
`multiqueue` will be used on the output of `parsebin` in order to handle the
buffering.

API: `parse-streams`
Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2784>

subprojects/gst-plugins-base/docs/plugins/gst_plugins_cache.json
subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c

index 1a17fda..efddbb8 100644 (file)
                         "type": "gdouble",
                         "writable": true
                     },
+                    "parse-streams": {
+                        "blurb": "Extract the elementary streams of non-raw sources",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "false",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "gboolean",
+                        "writable": true
+                    },
                     "ring-buffer-max-size": {
                         "blurb": "Max. amount of data in the ring buffer (bytes, 0 = ring buffer disabled)",
                         "conditionally-available": false,
index 46a1753..147e504 100644 (file)
@@ -102,12 +102,30 @@ struct _ChildSrcPadInfo
   /* An optional typefind */
   GstElement *typefind;
 
-  /* An optional demuxer */
+  /* Pre-parsebin buffering elements. Only present is parse-streams and
+   * downloading *or* ring-buffer-max-size */
+  GstElement *pre_parse_queue;
+
+  /* Post-parsebin multiqueue. Only present if parse-streams and buffering is
+   * required */
+  GstElement *multiqueue;
+
+  /* An optional demuxer or parsebin */
   GstElement *demuxer;
   gboolean demuxer_handles_buffering;
 
   /* list of output slots */
   GList *outputs;
+
+  /* The following fields specify how this output should be handled */
+
+  /* use_downloadbuffer : TRUE if the content from the source should be
+   * downloaded with a downloadbuffer element */
+  gboolean use_downloadbuffer;
+
+  /* use_queue2: TRUE if the contents should be buffered through a queue2
+   * element */
+  gboolean use_queue2;
 };
 
 /* Output Slot:
@@ -157,6 +175,7 @@ struct _GstURISourceBin
   gboolean use_buffering;
   gdouble low_watermark;
   gdouble high_watermark;
+  gboolean parse_streams;
 
   GstElement *source;
 
@@ -217,6 +236,7 @@ enum
 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
 #define DEFAULT_LOW_WATERMARK       0.01
 #define DEFAULT_HIGH_WATERMARK      0.60
+#define DEFAULT_PARSE_STREAMS       FALSE
 
 #define ACTUAL_DEFAULT_BUFFER_SIZE  10 * 1024 * 1024    /* The value used for byte limits when buffer-size == -1 */
 #define ACTUAL_DEFAULT_BUFFER_DURATION  5 * GST_SECOND  /* The value used for time limits when buffer-duration == -1 */
@@ -239,6 +259,7 @@ enum
   PROP_LOW_WATERMARK,
   PROP_HIGH_WATERMARK,
   PROP_STATISTICS,
+  PROP_PARSE_STREAMS,
 };
 
 #define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
@@ -289,7 +310,6 @@ static void handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad,
 static gboolean setup_typefind (ChildSrcPadInfo * info);
 static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
 static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info,
-    gboolean do_download, gboolean is_adaptive, gboolean no_buffering,
     GstPad * originating_pad);
 static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
 static void free_output_slot_async (GstURISourceBin * urisrc,
@@ -424,6 +444,20 @@ gst_uri_source_bin_class_init (GstURISourceBinClass * klass)
           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
   /**
+   * GstURISourceBin:parse-streams:
+   *
+   * A `parsebin` element will be used on all non-raw streams, and urisourcebin
+   * will output the elementary streams. Recommended when buffering is used
+   * since it will provide accurate buffering levels.
+   *
+   * Since: 1.22
+   */
+  g_object_class_install_property (gobject_class, PROP_PARSE_STREAMS,
+      g_param_spec_boolean ("parse-streams", "Parse Streams",
+          "Extract the elementary streams of non-raw sources",
+          DEFAULT_PARSE_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
    * GstURISourceBin::drained:
    *
    * This signal is emitted when the data for the current uri is played.
@@ -560,6 +594,9 @@ gst_uri_source_bin_set_property (GObject * object, guint prop_id,
       urisrc->high_watermark = g_value_get_double (value);
       update_queue_values (urisrc);
       break;
+    case PROP_PARSE_STREAMS:
+      urisrc->parse_streams = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -616,6 +653,9 @@ gst_uri_source_bin_get_property (GObject * object, guint prop_id,
     case PROP_STATISTICS:
       g_value_take_boxed (value, get_queue_statistics (urisrc));
       break;
+    case PROP_PARSE_STREAMS:
+      g_value_set_boolean (value, urisrc->parse_streams);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -659,6 +699,19 @@ free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc)
   g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc);
   g_list_free (info->outputs);
 
+  if (info->multiqueue) {
+    GST_DEBUG_OBJECT (urisrc, "Removing multiqueue");
+    gst_element_set_state (info->multiqueue, GST_STATE_NULL);
+    remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->multiqueue));
+    gst_bin_remove (GST_BIN_CAST (urisrc), info->multiqueue);
+  }
+
+  if (info->pre_parse_queue) {
+    gst_element_set_state (info->pre_parse_queue, GST_STATE_NULL);
+    remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->pre_parse_queue));
+    gst_bin_remove (GST_BIN_CAST (urisrc), info->pre_parse_queue);
+  }
+
   g_free (info);
 }
 
@@ -700,17 +753,14 @@ new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
   OutputSlotInfo *slot;
   GstPad *output_pad;
 
+  GST_DEBUG_OBJECT (element, "New pad %" GST_PTR_FORMAT, pad);
+
   GST_URI_SOURCE_BIN_LOCK (urisrc);
   /* If the demuxer handles buffering and is streams-aware, we can expose it
      as-is directly. We still add an event probe to deal with EOS */
-  slot =
-      new_output_slot (info, FALSE, FALSE, info->demuxer_handles_buffering,
-      pad);
+  slot = new_output_slot (info, pad);
   output_pad = gst_object_ref (slot->output_pad);
 
-  GST_DEBUG_OBJECT (element,
-      "New streams-aware demuxer pad %s:%s , exposing directly",
-      GST_DEBUG_PAD_NAME (pad));
   slot->demuxer_event_probe_id =
       gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
       GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) demux_pad_events,
@@ -968,27 +1018,104 @@ on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec,
       (GstElementCallAsyncFunc) update_queue_values, NULL, NULL);
 }
 
+static void
+setup_downloadbuffer (GstURISourceBin * urisrc, GstElement * downloadbuffer)
+{
+  gchar *temp_template, *filename;
+  const gchar *tmp_dir, *prgname;
+
+  tmp_dir = g_get_user_cache_dir ();
+  prgname = g_get_prgname ();
+  if (prgname == NULL)
+    prgname = "GStreamer";
+
+  filename = g_strdup_printf ("%s-XXXXXX", prgname);
+
+  /* build our filename */
+  temp_template = g_build_filename (tmp_dir, filename, NULL);
+
+  GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
+      temp_template, tmp_dir, prgname, filename);
+
+  /* configure progressive download for selected media types */
+  g_object_set (downloadbuffer, "temp-template", temp_template, NULL);
+
+  g_free (filename);
+  g_free (temp_template);
+}
+
+static void
+setup_multiqueue (GstURISourceBin * urisrc, ChildSrcPadInfo * info,
+    GstElement * multiqueue)
+{
+  if (info->use_downloadbuffer) {
+    /* If we have a downloadbuffer we will let that one deal with buffering,
+       and we only use multiqueue for dealing with interleave */
+    g_object_set (info->multiqueue, "use-buffering", FALSE, NULL);
+  } else {
+    /* Else we set the minimum interleave time of multiqueue to the required
+     * buffering duration and ask it to report buffering */
+    g_object_set (info->multiqueue, "use-buffering", TRUE,
+        "min-interleave-time", GET_BUFFER_DURATION (urisrc), NULL);
+  }
+  /* Common properties */
+  g_object_set (info->multiqueue,
+      "sync-by-running-time", TRUE,
+      "use-interleave", TRUE,
+      "max-size-bytes", 0,
+      "max-size-buffers", 0,
+      "low-watermark", urisrc->low_watermark,
+      "high-watermark", urisrc->high_watermark, NULL);
+  gst_bin_add (GST_BIN_CAST (urisrc), info->multiqueue);
+  gst_element_sync_state_with_parent (info->multiqueue);
+}
+
 /* Called with lock held */
 static OutputSlotInfo *
-new_output_slot (ChildSrcPadInfo * info, gboolean do_download,
-    gboolean is_adaptive, gboolean no_buffering, GstPad * originating_pad)
+new_output_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
 {
   GstURISourceBin *urisrc = info->urisrc;
   OutputSlotInfo *slot;
   GstPad *srcpad;
   GstElement *queue = NULL;
   const gchar *elem_name;
+  gboolean use_downloadbuffer;
 
   GST_DEBUG_OBJECT (urisrc,
-      "do_download:%d is_adaptive:%d, no_buffering:%d, originating_pad:%"
-      GST_PTR_FORMAT, do_download, is_adaptive, no_buffering, originating_pad);
+      "use_queue2:%d use_downloadbuffer:%d, demuxer:%d, originating_pad:%"
+      GST_PTR_FORMAT, info->use_queue2, info->use_downloadbuffer,
+      info->demuxer != NULL, originating_pad);
 
   slot = g_new0 (OutputSlotInfo, 1);
   slot->linked_info = info;
 
-  /* If buffering is required, create the element */
-  if (!no_buffering) {
-    if (do_download)
+  /* If a demuxer/parsebin is present, then the downloadbuffer will have been handled before that */
+  use_downloadbuffer = info->use_downloadbuffer && !info->demuxer;
+
+  /* If parsebin is used and buffering is required, we go through a multiqueue */
+  if (urisrc->parse_streams && (info->use_queue2 || info->use_downloadbuffer)) {
+    GST_DEBUG_OBJECT (urisrc, "Using multiqueue");
+    if (!info->multiqueue) {
+      GST_DEBUG_OBJECT (urisrc,
+          "Creating multiqueue for buffering elementary streams");
+      elem_name = "multiqueue";
+      info->multiqueue = gst_element_factory_make (elem_name, NULL);
+      if (!info->multiqueue)
+        goto no_buffer_element;
+      setup_multiqueue (urisrc, info, info->multiqueue);
+    }
+
+    slot->queue_sinkpad =
+        gst_element_request_pad_simple (info->multiqueue, "sink_%u");
+    srcpad = gst_pad_get_single_internal_link (slot->queue_sinkpad);
+    slot->output_pad = create_output_pad (slot, srcpad);
+    gst_object_unref (srcpad);
+    gst_pad_link (originating_pad, slot->queue_sinkpad);
+  }
+  /* If buffering is required, create the element. If downloadbuffer is
+   * required, it will take precedence over queue2 */
+  else if (use_downloadbuffer || info->use_queue2) {
+    if (use_downloadbuffer)
       elem_name = "downloadbuffer";
     else
       elem_name = "queue2";
@@ -1003,40 +1130,23 @@ new_output_slot (ChildSrcPadInfo * info, gboolean do_download,
         g_signal_connect (G_OBJECT (queue), "notify::bitrate",
         (GCallback) on_queue_bitrate_changed, urisrc);
 
-    if (do_download) {
-      gchar *temp_template, *filename;
-      const gchar *tmp_dir, *prgname;
-
-      tmp_dir = g_get_user_cache_dir ();
-      prgname = g_get_prgname ();
-      if (prgname == NULL)
-        prgname = "GStreamer";
-
-      filename = g_strdup_printf ("%s-XXXXXX", prgname);
-
-      /* build our filename */
-      temp_template = g_build_filename (tmp_dir, filename, NULL);
-
-      GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
-          temp_template, tmp_dir, prgname, filename);
-
-      /* configure progressive download for selected media types */
-      g_object_set (queue, "temp-template", temp_template, NULL);
-
-      g_free (filename);
-      g_free (temp_template);
+    if (use_downloadbuffer) {
+      setup_downloadbuffer (urisrc, slot->queue);
     } else {
-      if (is_adaptive) {
-        GST_LOG_OBJECT (urisrc, "Adding queue2 for adaptive streaming stream");
-        g_object_set (queue, "use-buffering", urisrc->use_buffering,
-            "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL);
+      g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
+      if (info->demuxer) {
+        /* If a adaptive demuxer or parsebin is used, use more accurate information */
+        g_object_set (queue, "use-tags-bitrate", TRUE, "use-rate-estimate",
+            FALSE, NULL);
       } else {
-        GST_LOG_OBJECT (urisrc, "Adding queue for buffering");
-        g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
+        GST_DEBUG_OBJECT (queue,
+            "Setting ring-buffer-max-size %" G_GUINT64_FORMAT,
+            urisrc->ring_buffer_max_size);
+        /* Else allow ring-buffer-max-size setting to be used */
+        g_object_set (queue, "ring-buffer-max-size",
+            urisrc->ring_buffer_max_size, NULL);
       }
 
-      g_object_set (queue, "ring-buffer-max-size",
-          urisrc->ring_buffer_max_size, NULL);
       /* Disable max-size-buffers - queue based on data rate to the default time limit */
       g_object_set (queue, "max-size-buffers", 0, NULL);
 
@@ -1075,8 +1185,9 @@ new_output_slot (ChildSrcPadInfo * info, gboolean do_download,
   /* save output slot so we can remove it later */
   info->outputs = g_list_append (info->outputs, slot);
 
-  GST_DEBUG_OBJECT (urisrc, "New slot for output_pad %" GST_PTR_FORMAT,
-      slot->output_pad);
+  GST_DEBUG_OBJECT (urisrc,
+      "New output_pad %" GST_PTR_FORMAT " for originating pad %" GST_PTR_FORMAT,
+      slot->output_pad, originating_pad);
 
   return slot;
 
@@ -1550,11 +1661,13 @@ analyse_pad_foreach (const GValue * item, AnalyseData * data)
     OutputSlotInfo *slot;
 
     GST_URI_SOURCE_BIN_LOCK (urisrc);
-    /* Only use buffering on raw pads in very specific conditions */
+    /* Only use buffering (via queue2) on raw pads in very specific
+     * conditions */
+    info->use_queue2 = urisrc->use_buffering && IS_QUEUE_URI (urisrc->uri);
+
     GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d",
         urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri));
-    slot = new_output_slot (info, FALSE, FALSE, !urisrc->use_buffering
-        || !IS_QUEUE_URI (urisrc->uri), pad);
+    slot = new_output_slot (info, pad);
 
     if (!slot) {
       res = FALSE;
@@ -1732,6 +1845,99 @@ no_demuxer:
   }
 }
 
+static gboolean
+setup_parsebin_for_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
+{
+  GstURISourceBin *urisrc = info->urisrc;
+  GstPad *sinkpad;
+  GstPadLinkReturn link_res;
+
+  GST_DEBUG_OBJECT (urisrc, "Setting up parsebin for %" GST_PTR_FORMAT,
+      originating_pad);
+
+  GST_STATE_LOCK (urisrc);
+  GST_URI_SOURCE_BIN_LOCK (urisrc);
+
+  /* Set up optional pre-parsebin download/ringbuffer elements */
+  if (info->use_downloadbuffer || urisrc->ring_buffer_max_size) {
+    if (info->use_downloadbuffer) {
+      GST_DEBUG_OBJECT (urisrc, "Setting up pre-parsebin downloadbuffer");
+      info->pre_parse_queue = gst_element_factory_make ("downloadbuffer", NULL);
+      setup_downloadbuffer (urisrc, info->pre_parse_queue);
+      g_object_set (info->pre_parse_queue, "max-size-bytes",
+          GET_BUFFER_SIZE (urisrc), "max-size-time",
+          (guint64) GET_BUFFER_DURATION (urisrc), NULL);
+    } else if (urisrc->ring_buffer_max_size) {
+      /* If a ring-buffer-max-size is specified with parsebin, we set it up on
+       * the queue2 *before* parsebin. We will use its buffering levels instead
+       * of the ones from multiqueue */
+      GST_DEBUG_OBJECT (urisrc,
+          "Setting up pre-parsebin queue2 for ring-buffer-max-size %"
+          G_GUINT64_FORMAT, urisrc->ring_buffer_max_size);
+      info->pre_parse_queue = gst_element_factory_make ("queue2", NULL);
+      /* We do not use this queue2 for buffering levels, but the multiqueue */
+      g_object_set (info->pre_parse_queue, "use-buffering", FALSE,
+          "ring-buffer-max-size", urisrc->ring_buffer_max_size,
+          "max-size-buffers", 0, NULL);
+    }
+    gst_element_set_locked_state (info->pre_parse_queue, TRUE);
+    gst_bin_add (GST_BIN_CAST (urisrc), info->pre_parse_queue);
+    sinkpad = gst_element_get_static_pad (info->pre_parse_queue, "sink");
+    link_res = gst_pad_link (originating_pad, sinkpad);
+
+    gst_object_unref (sinkpad);
+    if (link_res != GST_PAD_LINK_OK)
+      goto could_not_link;
+  }
+
+  info->demuxer = gst_element_factory_make ("parsebin", NULL);
+  if (!info->demuxer) {
+    post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), "parsebin");
+    return FALSE;
+  }
+  gst_element_set_locked_state (info->demuxer, TRUE);
+  gst_bin_add (GST_BIN_CAST (urisrc), info->demuxer);
+
+  if (info->pre_parse_queue) {
+    if (!gst_element_link_pads (info->pre_parse_queue, "src", info->demuxer,
+            "sink"))
+      goto could_not_link;
+  } else {
+    sinkpad = gst_element_get_static_pad (info->demuxer, "sink");
+
+    link_res = gst_pad_link (originating_pad, sinkpad);
+
+    gst_object_unref (sinkpad);
+    if (link_res != GST_PAD_LINK_OK)
+      goto could_not_link;
+  }
+
+  /* set up callbacks to create the links between parsebin and output */
+  g_signal_connect (info->demuxer,
+      "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info);
+  g_signal_connect (info->demuxer,
+      "pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info);
+
+  if (info->pre_parse_queue) {
+    gst_element_set_locked_state (info->pre_parse_queue, FALSE);
+    gst_element_sync_state_with_parent (info->pre_parse_queue);
+  }
+  gst_element_set_locked_state (info->demuxer, FALSE);
+  gst_element_sync_state_with_parent (info->demuxer);
+  GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+  GST_STATE_UNLOCK (urisrc);
+  return TRUE;
+
+could_not_link:
+  {
+    GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+    GST_STATE_UNLOCK (urisrc);
+    GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
+        (NULL), ("Can't link to (pre-)parsebin element"));
+    return FALSE;
+  }
+}
+
 /* Called when:
  * * Source element adds a new pad
  * * typefind has found a type
@@ -1743,7 +1949,6 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
   gboolean is_raw;
   GstStructure *s;
   const gchar *media_type;
-  gboolean do_download = FALSE;
 
   GST_URI_SOURCE_BIN_LOCK (urisrc);
 
@@ -1754,7 +1959,7 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
 
     GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT
         ", exposing", caps);
-    slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
+    slot = new_output_slot (info, srcpad);
     output_pad = gst_object_ref (slot->output_pad);
     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
 
@@ -1781,7 +1986,8 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
 
     /* Query the demuxer to see if it can handle buffering */
     query = gst_query_new_buffering (GST_FORMAT_TIME);
-    info->demuxer_handles_buffering = gst_element_query (info->demuxer, query);
+    info->use_queue2 = urisrc->use_buffering
+        && !gst_element_query (info->demuxer, query);
     gst_query_unref (query);
     GST_DEBUG_OBJECT (urisrc, "Demuxer handles buffering : %d",
         info->demuxer_handles_buffering);
@@ -1798,45 +2004,55 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
 
     gst_element_sync_state_with_parent (info->demuxer);
   } else if (!urisrc->is_stream) {
-    OutputSlotInfo *slot;
-    GstPad *output_pad;
+    if (urisrc->parse_streams) {
+      /* GST_URI_SOURCE_BIN_LOCK (urisrc); */
+      setup_parsebin_for_slot (info, srcpad);
+      /* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */
+    } else {
+      /* We don't need buffering here, expose immediately */
+      OutputSlotInfo *slot;
+      GstPad *output_pad;
 
-    /* We don't need buffering here, expose immediately */
-    GST_URI_SOURCE_BIN_LOCK (urisrc);
-    slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
-    output_pad = gst_object_ref (slot->output_pad);
-    GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-    expose_output_pad (urisrc, output_pad);
-    gst_object_unref (output_pad);
+      GST_URI_SOURCE_BIN_LOCK (urisrc);
+      slot = new_output_slot (info, srcpad);
+      output_pad = gst_object_ref (slot->output_pad);
+      GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+      expose_output_pad (urisrc, output_pad);
+      gst_object_unref (output_pad);
+    }
   } else {
-    OutputSlotInfo *slot;
-    GstPad *output_pad;
-
     /* only enable download buffering if the upstream duration is known */
     if (urisrc->download) {
       GstQuery *query = gst_query_new_duration (GST_FORMAT_BYTES);
       if (gst_pad_query (srcpad, query)) {
         gint64 dur;
         gst_query_parse_duration (query, NULL, &dur);
-        do_download = (dur != -1);
+        info->use_downloadbuffer = (dur != -1);
       }
       gst_query_unref (query);
     }
+    info->use_queue2 = urisrc->use_buffering;
 
-    GST_DEBUG_OBJECT (urisrc, "check media-type %s, do_download:%d", media_type,
-        do_download);
+    if (urisrc->parse_streams) {
+      /* GST_URI_SOURCE_BIN_LOCK (urisrc); */
+      setup_parsebin_for_slot (info, srcpad);
+      /* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */
+    } else {
+      OutputSlotInfo *slot;
+      GstPad *output_pad;
 
-    GST_URI_SOURCE_BIN_LOCK (urisrc);
-    slot = new_output_slot (info, do_download, FALSE, FALSE, srcpad);
+      GST_URI_SOURCE_BIN_LOCK (urisrc);
+      slot = new_output_slot (info, srcpad);
 
-    gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
-        pre_queue_event_probe, urisrc, NULL);
+      gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
+          pre_queue_event_probe, urisrc, NULL);
 
-    output_pad = gst_object_ref (slot->output_pad);
-    GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+      output_pad = gst_object_ref (slot->output_pad);
+      GST_URI_SOURCE_BIN_UNLOCK (urisrc);
 
-    expose_output_pad (urisrc, output_pad);
-    gst_object_unref (output_pad);
+      expose_output_pad (urisrc, output_pad);
+      gst_object_unref (output_pad);
+    }
   }
 
   return;
@@ -1946,8 +2162,12 @@ free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
     gst_element_set_state (slot->queue, GST_STATE_NULL);
     remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
     gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
-
-    gst_object_unref (slot->queue_sinkpad);
+  }
+  if (slot->queue_sinkpad) {
+    if (slot->linked_info && slot->linked_info->multiqueue)
+      gst_element_release_request_pad (slot->linked_info->multiqueue,
+          slot->queue_sinkpad);
+    gst_object_replace ((GstObject **) & slot->queue_sinkpad, NULL);
   }
 
   if (slot->demuxer_event_probe_id)
@@ -2237,7 +2457,6 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
     return;
   }
 
-
   g_mutex_lock (&urisrc->buffering_post_lock);
 
   /*
@@ -2379,6 +2598,20 @@ handle_message (GstBin * bin, GstMessage * msg)
       }
       break;
     }
+    case GST_MESSAGE_STREAM_COLLECTION:
+      /* We only want to forward stream collection from the source element *OR*
+       * from adaptive demuxers. We do not want to forward them from the
+       * potential parsebins since there might be many and require aggregation
+       * to be useful/coherent. */
+      if (GST_MESSAGE_SRC (msg) != (GstObject *) urisrc->source
+          && !urisrc->is_adaptive) {
+        GST_DEBUG_OBJECT (bin,
+            "Dropping stream-collection from non-adaptive-demuxer %"
+            GST_PTR_FORMAT, GST_MESSAGE_SRC (msg));
+        gst_message_unref (msg);
+        msg = NULL;
+      }
+      break;
     case GST_MESSAGE_BUFFERING:
       handle_buffering_message (urisrc, msg);
       msg = NULL;