urisourcebin: Refactor ChildSrcPadInfo and OutputSlot usage
authorEdward Hervey <edward@centricular.com>
Tue, 1 Nov 2022 08:49:39 +0000 (09:49 +0100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 16 Nov 2022 14:01:46 +0000 (14:01 +0000)
Make an explicit topology/tree of structures:
* ChildSrcPadInfo is created for each source element source pad
* ChildSrcPadInfo contains the chain of optional elements specific to that
  pad (ex: typefind)
* A ChildSrcPadInfo links to one or more OutputSlot, which contain what is
  specific to the output (i.e. optional buffering and ghostpad)
* No longer use GObject {set|get}_data() functions to store those structures and
  instead make them explicit
* Pass those structures around explicitely in each function/callback

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2784>

subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c

index af6ebbe..b1a543e 100644 (file)
@@ -90,40 +90,41 @@ typedef struct _OutputSlotInfo OutputSlotInfo;
     g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock);           \
 } G_STMT_END
 
-/* Track a source pad from a child (source, typefind, demuxer) that is linked or
- * needs linking to an output slot, or source pads that are directly exposed as
- * ghost pads */
+/* Track a source pad from the source element and the chain of (optional)
+ * elements that are linked to it up to the output slots */
 struct _ChildSrcPadInfo
 {
-  /* Source pad this info is attached to (not reffed, since the pad owns the
-   * ChildSrcPadInfo as qdata) */
-  GstPad *src_pad;
+  GstURISourceBin *urisrc;
 
-  /* The output GhostPad if this info is for a directly exposed pad (raw or
-   * not), rather than linked through a slot. */
-  GstPad *output_pad;
+  /* Source pad this info is attached to (reffed) */
+  GstPad *src_pad;
 
-  /* Configured output slot, if any buffering/download is required */
-  OutputSlotInfo *output_slot;
+  /* An optional typefind */
+  GstElement *typefind;
 
-  /* ADAPTIVE DEMUXER SPECIFIC */
-  guint blocking_probe_id;
-  guint event_probe_id;
+  /* list of output slots */
+  GList *outputs;
 };
 
 /* Output Slot:
  *
- * Buffered output
+ * Handles everything related to outputing, including optional buffering.
  */
 struct _OutputSlotInfo
 {
-  ChildSrcPadInfo *linked_info; /* demux source pad info feeding this slot, if any */
-  GstElement *queue;            /* queue2 or downloadbuffer */
-  GstPad *queue_sinkpad;        /* Sink pad of the queue eleemnt */
+  ChildSrcPadInfo *linked_info; /* source pad info feeding this slot */
+
+  GstPad *originating_pad;      /* Pad that created this OutputSlotInfo (ref held) */
   GstPad *output_pad;           /* Output ghost pad */
+
   gboolean is_eos;              /* Did EOS get fed into the buffering element */
 
+  GstElement *queue;            /* queue2 or downloadbuffer */
+  GstPad *queue_sinkpad;        /* Sink pad of the queue eleemnt */
+
   gulong bitrate_changed_id;    /* queue bitrate changed notification */
+
+  guint demuxer_event_probe_id;
 };
 
 /**
@@ -147,7 +148,6 @@ struct _GstURISourceBin
   gboolean is_stream;
   gboolean is_adaptive;
   gboolean demuxer_handles_buffering;   /* If TRUE: Don't use buffering elements */
-  gboolean source_streams_aware;        /* if TRUE: Don't block output pads */
   guint64 buffer_duration;      /* When buffering, buffer duration (ns) */
   guint buffer_size;            /* When buffering, buffer size (bytes) */
   gboolean download;
@@ -156,10 +156,10 @@ struct _GstURISourceBin
   gdouble high_watermark;
 
   GstElement *source;
-  GList *typefinds;             /* list of typefind element */
+
+  GList *src_infos;             /* List of ChildSrcPadInfo for the source */
 
   GstElement *demuxer;          /* Adaptive demuxer if any */
-  GSList *out_slots;
 
   guint numpads;
 
@@ -283,17 +283,18 @@ static gboolean gst_uri_source_bin_query (GstElement * element,
 static GstStateChangeReturn gst_uri_source_bin_change_state (GstElement *
     element, GstStateChange transition);
 
-static void handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad,
+static void handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad,
     GstCaps * caps);
-static gboolean setup_typefind (GstURISourceBin * urisrc, GstPad * srcpad);
+static gboolean setup_typefind (ChildSrcPadInfo * info);
 static void remove_demuxer (GstURISourceBin * bin);
 static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
-static OutputSlotInfo *get_output_slot (GstURISourceBin * urisrc,
-    gboolean do_download, gboolean is_adaptive);
+static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info,
+    gboolean do_download, gboolean is_adaptive, gboolean no_buffering,
+    GstPad * originating_pad);
 static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
 static void free_output_slot_async (GstURISourceBin * urisrc,
     OutputSlotInfo * slot);
-static GstPad *create_output_pad (GstURISourceBin * urisrc, GstPad * pad);
+static GstPad *create_output_pad (OutputSlotInfo * slot, GstPad * pad);
 static void remove_buffering_msgs (GstURISourceBin * bin, GstObject * src);
 
 static void update_queue_values (GstURISourceBin * urisrc);
@@ -636,79 +637,129 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
 }
 
 static GstPadProbeReturn
-demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data);
+demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot);
 
+/* CALL WITH URISOURCEBIN LOCK */
 static void
-free_child_src_pad_info (ChildSrcPadInfo * info)
+free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc)
 {
-  if (info->output_pad)
-    gst_object_unref (info->output_pad);
+  g_assert (info->src_pad);
+
+  GST_DEBUG_OBJECT (urisrc,
+      "Freeing ChildSrcPadInfo for %" GST_PTR_FORMAT, info->src_pad);
+  if (info->typefind) {
+    gst_element_set_state (info->typefind, GST_STATE_NULL);
+    gst_bin_remove (GST_BIN_CAST (urisrc), info->typefind);
+  }
+
+  gst_object_unref (info->src_pad);
+
+  g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc);
+  g_list_free (info->outputs);
+
   g_free (info);
 }
 
-/* Called by the signal handlers when a demuxer has produced a new stream */
-static void
-new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
-    GstURISourceBin * urisrc)
+static ChildSrcPadInfo *
+get_cspi_for_pad (GstURISourceBin * urisrc, GstPad * pad)
+{
+  GList *iter;
+
+  for (iter = urisrc->src_infos; iter; iter = iter->next) {
+    ChildSrcPadInfo *info = iter->data;
+    if (info->src_pad == pad)
+      return info;
+  }
+  return NULL;
+}
+
+static ChildSrcPadInfo *
+new_child_src_pad_info (GstURISourceBin * urisrc, GstPad * pad)
 {
   ChildSrcPadInfo *info;
 
+  GST_LOG_OBJECT (urisrc, "New ChildSrcPadInfo for %" GST_PTR_FORMAT, pad);
+
   info = g_new0 (ChildSrcPadInfo, 1);
-  info->src_pad = pad;
+  info->urisrc = urisrc;
+  info->src_pad = gst_object_ref (pad);
 
-  g_object_set_data_full (G_OBJECT (pad), "urisourcebin.srcpadinfo",
-      info, (GDestroyNotify) free_child_src_pad_info);
+  urisrc->src_infos = g_list_append (urisrc->src_infos, info);
+
+  return info;
+}
+
+/* Called by the signal handlers when a demuxer has produced a new stream */
+static void
+new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
+    ChildSrcPadInfo * info)
+{
+  GstURISourceBin *urisrc = info->urisrc;
+  OutputSlotInfo *slot;
+  GstPad *output_pad;
 
   GST_URI_SOURCE_BIN_LOCK (urisrc);
   /* If the demuxer handles buffering and is streams-aware, we can expose it
      as-is directly. We still add an event probe to deal with EOS */
-  if (urisrc->demuxer_handles_buffering && urisrc->source_streams_aware) {
-    info->output_pad = gst_object_ref (create_output_pad (urisrc, pad));
-    GST_DEBUG_OBJECT (element,
-        "New streams-aware demuxer pad %s:%s , exposing directly",
-        GST_DEBUG_PAD_NAME (pad));
-    GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-    expose_output_pad (urisrc, info->output_pad);
-  } else {
-    g_return_if_reached ();
-  }
-  info->event_probe_id =
+  slot =
+      new_output_slot (info, FALSE, FALSE, urisrc->demuxer_handles_buffering,
+      pad);
+  output_pad = gst_object_ref (slot->output_pad);
+
+  GST_DEBUG_OBJECT (element,
+      "New streams-aware demuxer pad %s:%s , exposing directly",
+      GST_DEBUG_PAD_NAME (pad));
+  slot->demuxer_event_probe_id =
       gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
-      GST_PAD_PROBE_TYPE_EVENT_FLUSH, demux_pad_events, urisrc, NULL);
+      GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) demux_pad_events,
+      slot, NULL);
+
+  GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+  expose_output_pad (urisrc, output_pad);
+  gst_object_unref (output_pad);
 }
 
 /* Called with lock held */
 static gboolean
 all_slots_are_eos (GstURISourceBin * urisrc)
 {
-  GSList *tmp;
+  GList *tmp;
 
-  for (tmp = urisrc->out_slots; tmp; tmp = tmp->next) {
-    OutputSlotInfo *slot = (OutputSlotInfo *) tmp->data;
-    if (slot->is_eos == FALSE)
-      return FALSE;
+  for (tmp = urisrc->src_infos; tmp; tmp = tmp->next) {
+    ChildSrcPadInfo *cspi = tmp->data;
+    GList *iter2;
+    for (iter2 = cspi->outputs; iter2; iter2 = iter2->next) {
+      OutputSlotInfo *slot = (OutputSlotInfo *) iter2->data;
+      if (slot->is_eos == FALSE)
+        return FALSE;
+    }
   }
   return TRUE;
 }
 
+/* CALL WITH URISOURCEBIN LOCK */
+static OutputSlotInfo *
+output_slot_for_originating_pad (ChildSrcPadInfo * info,
+    GstPad * originating_pad)
+{
+  GList *iter;
+  for (iter = info->outputs; iter; iter = iter->next) {
+    OutputSlotInfo *slot = iter->data;
+    if (slot->originating_pad == originating_pad)
+      return slot;
+  }
+
+  return NULL;
+}
+
 static GstPadProbeReturn
-demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot)
 {
-  GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
-  ChildSrcPadInfo *child_info;
+  GstURISourceBin *urisrc = slot->linked_info->urisrc;
   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
   GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
 
-  if (!(child_info =
-          g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
-    goto done;
-
   GST_URI_SOURCE_BIN_LOCK (urisrc);
-  /* If not linked to a slot, nothing more to do */
-  if (child_info->output_slot == NULL) {
-    GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-    goto done;
-  }
 
   switch (GST_EVENT_TYPE (ev)) {
     case GST_EVENT_EOS:
@@ -719,20 +770,13 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
 
       BUFFERING_LOCK (urisrc);
       /* Mark that we fed an EOS to this slot */
-      child_info->output_slot->is_eos = TRUE;
+      slot->is_eos = TRUE;
       all_streams_eos = all_slots_are_eos (urisrc);
       BUFFERING_UNLOCK (urisrc);
 
-      /* EOS means this element is no longer buffering */
-      remove_buffering_msgs (urisrc,
-          GST_OBJECT_CAST (child_info->output_slot->queue));
-
-      /* Mark this custom EOS, replacing the event in the probe data */
-      ev = gst_event_make_writable (ev);
-      GST_PAD_PROBE_INFO_DATA (info) = ev;
-
-      gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev), CUSTOM_EOS_QUARK,
-          (gchar *) CUSTOM_EOS_QUARK_DATA, NULL);
+      if (slot->queue)
+        /* EOS means this element is no longer buffering */
+        remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
 
       if (all_streams_eos) {
         GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH");
@@ -744,7 +788,7 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
     case GST_EVENT_STREAM_START:
     case GST_EVENT_FLUSH_STOP:
       BUFFERING_LOCK (urisrc);
-      child_info->output_slot->is_eos = FALSE;
+      slot->is_eos = FALSE;
       BUFFERING_UNLOCK (urisrc);
       break;
     default:
@@ -753,7 +797,6 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
 
   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
 
-done:
   return ret;
 }
 
@@ -787,31 +830,37 @@ get_queue_statistics (GstURISourceBin * urisrc)
   guint64 min_time_level = 0, max_time_level = 0;
   gdouble avg_byte_level = 0., avg_time_level = 0.;
   guint i = 0;
-  GSList *cur;
+  GList *iter, *cur;
 
   GST_URI_SOURCE_BIN_LOCK (urisrc);
 
-  for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
-    OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
-    guint byte_limit = 0;
-    guint64 time_limit = 0;
+  for (iter = urisrc->src_infos; iter; iter = iter->next) {
+    ChildSrcPadInfo *info = iter->data;
+    for (cur = info->outputs; cur; cur = cur->next) {
+      OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
+      guint byte_limit = 0;
+      guint64 time_limit = 0;
+
+      if (!slot->queue)
+        continue;
 
-    g_object_get (slot->queue, "current-level-bytes", &byte_limit,
-        "current-level-time", &time_limit, NULL);
+      g_object_get (slot->queue, "current-level-bytes", &byte_limit,
+          "current-level-time", &time_limit, NULL);
 
-    if (byte_limit < min_byte_level)
-      min_byte_level = byte_limit;
-    if (byte_limit > max_byte_level)
-      max_byte_level = byte_limit;
-    avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1);
+      if (byte_limit < min_byte_level)
+        min_byte_level = byte_limit;
+      if (byte_limit > max_byte_level)
+        max_byte_level = byte_limit;
+      avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1);
 
-    if (time_limit < min_time_level)
-      min_time_level = time_limit;
-    if (time_limit > max_time_level)
-      max_time_level = time_limit;
-    avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1);
+      if (time_limit < min_time_level)
+        min_time_level = time_limit;
+      if (time_limit > max_time_level)
+        max_time_level = time_limit;
+      avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1);
 
-    i++;
+      i++;
+    }
   }
   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
 
@@ -833,7 +882,7 @@ update_queue_values (GstURISourceBin * urisrc)
   guint buffer_size;
   gdouble low_watermark, high_watermark;
   guint64 cumulative_bitrate = 0;
-  GSList *cur;
+  GList *iter, *cur;
 
   GST_URI_SOURCE_BIN_LOCK (urisrc);
   duration = GET_BUFFER_DURATION (urisrc);
@@ -841,22 +890,29 @@ update_queue_values (GstURISourceBin * urisrc)
   low_watermark = urisrc->low_watermark;
   high_watermark = urisrc->high_watermark;
 
-  for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
-    OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
-    guint64 bitrate = 0;
+  for (iter = urisrc->src_infos; iter; iter = iter->next) {
+    ChildSrcPadInfo *info = iter->data;
+    for (cur = info->outputs; cur; cur = cur->next) {
+      OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
+      guint64 bitrate = 0;
 
-    if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
-            "bitrate")) {
-      g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
-    }
+      if (!slot->queue)
+        continue;
 
-    if (bitrate > 0)
-      cumulative_bitrate += bitrate;
-    else {
-      GST_TRACE_OBJECT (urisrc, "Unknown bitrate detected from %" GST_PTR_FORMAT
-          ", resetting all bitrates", slot->queue);
-      cumulative_bitrate = 0;
-      break;
+      if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
+              "bitrate")) {
+        g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
+      }
+
+      if (bitrate > 0)
+        cumulative_bitrate += bitrate;
+      else {
+        GST_TRACE_OBJECT (urisrc,
+            "Unknown bitrate detected from %" GST_PTR_FORMAT
+            ", resetting all bitrates", slot->queue);
+        cumulative_bitrate = 0;
+        break;
+      }
     }
   }
 
@@ -864,32 +920,38 @@ update_queue_values (GstURISourceBin * urisrc)
       "bitrate %" G_GUINT64_FORMAT ", buffer size %u, buffer duration %"
       G_GINT64_FORMAT, cumulative_bitrate, buffer_size, duration);
 
-  for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
-    OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
-    guint byte_limit;
-
-    if (cumulative_bitrate > 0
-        && g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
-            "bitrate")) {
-      guint64 bitrate;
-      g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
-      byte_limit =
-          gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate);
-    } else {
-      /* if not all queue's have valid bitrates, use the buffer-size as the
-       * limit */
-      byte_limit = buffer_size;
-    }
+  for (iter = urisrc->src_infos; iter; iter = iter->next) {
+    ChildSrcPadInfo *info = iter->data;
+    for (cur = info->outputs; cur; cur = cur->next) {
+      OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
+      guint byte_limit;
+
+      if (!slot->queue)
+        continue;
+
+      if (cumulative_bitrate > 0
+          && g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
+              "bitrate")) {
+        guint64 bitrate;
+        g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
+        byte_limit =
+            gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate);
+      } else {
+        /* if not all queue's have valid bitrates, use the buffer-size as the
+         * limit */
+        byte_limit = buffer_size;
+      }
 
-    GST_DEBUG_OBJECT (urisrc,
-        "calculated new limits for queue-like element %" GST_PTR_FORMAT
-        ", bytes:%u, time:%" G_GUINT64_FORMAT
-        ", low-watermark:%f, high-watermark:%f",
-        slot->queue, byte_limit, (guint64) duration, low_watermark,
-        high_watermark);
-    g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit,
-        "max-size-time", (guint64) duration, "low-watermark", low_watermark,
-        "high-watermark", high_watermark, NULL);
+      GST_DEBUG_OBJECT (urisrc,
+          "calculated new limits for queue-like element %" GST_PTR_FORMAT
+          ", bytes:%u, time:%" G_GUINT64_FORMAT
+          ", low-watermark:%f, high-watermark:%f",
+          slot->queue, byte_limit, (guint64) duration, low_watermark,
+          high_watermark);
+      g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit,
+          "max-size-time", (guint64) duration, "low-watermark", low_watermark,
+          "high-watermark", high_watermark, NULL);
+    }
   }
   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
 }
@@ -906,112 +968,119 @@ on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec,
 
 /* Called with lock held */
 static OutputSlotInfo *
-get_output_slot (GstURISourceBin * urisrc, gboolean do_download,
-    gboolean is_adaptive)
+new_output_slot (ChildSrcPadInfo * info, gboolean do_download,
+    gboolean is_adaptive, gboolean no_buffering, GstPad * originating_pad)
 {
+  GstURISourceBin *urisrc = info->urisrc;
   OutputSlotInfo *slot;
   GstPad *srcpad;
-  GstElement *queue;
+  GstElement *queue = NULL;
   const gchar *elem_name;
 
-  /* Otherwise create the new slot */
-  if (do_download)
-    elem_name = "downloadbuffer";
-  else
-    elem_name = "queue2";
-
-  queue = gst_element_factory_make (elem_name, NULL);
-  if (!queue)
-    goto no_buffer_element;
+  GST_DEBUG_OBJECT (urisrc,
+      "do_download:%d is_adaptive:%d, no_buffering:%d, originating_pad:%"
+      GST_PTR_FORMAT, do_download, is_adaptive, no_buffering, originating_pad);
 
   slot = g_new0 (OutputSlotInfo, 1);
-  slot->queue = queue;
+  slot->linked_info = info;
 
-  /* Set the slot onto the queue (needed in buffering msg handling) */
-  g_object_set_data (G_OBJECT (queue), "urisourcebin.slotinfo", slot);
+  /* If buffering is required, create the element */
+  if (!no_buffering) {
+    if (do_download)
+      elem_name = "downloadbuffer";
+    else
+      elem_name = "queue2";
 
-  slot->bitrate_changed_id =
-      g_signal_connect (G_OBJECT (queue), "notify::bitrate",
-      (GCallback) on_queue_bitrate_changed, urisrc);
+    queue = gst_element_factory_make (elem_name, NULL);
+    if (!queue)
+      goto no_buffer_element;
 
-  if (do_download) {
-    gchar *temp_template, *filename;
-    const gchar *tmp_dir, *prgname;
+    slot->queue = queue;
 
-    tmp_dir = g_get_user_cache_dir ();
-    prgname = g_get_prgname ();
-    if (prgname == NULL)
-      prgname = "GStreamer";
+    slot->bitrate_changed_id =
+        g_signal_connect (G_OBJECT (queue), "notify::bitrate",
+        (GCallback) on_queue_bitrate_changed, urisrc);
 
-    filename = g_strdup_printf ("%s-XXXXXX", prgname);
+    if (do_download) {
+      gchar *temp_template, *filename;
+      const gchar *tmp_dir, *prgname;
 
-    /* build our filename */
-    temp_template = g_build_filename (tmp_dir, filename, NULL);
+      tmp_dir = g_get_user_cache_dir ();
+      prgname = g_get_prgname ();
+      if (prgname == NULL)
+        prgname = "GStreamer";
 
-    GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
-        temp_template, tmp_dir, prgname, filename);
+      filename = g_strdup_printf ("%s-XXXXXX", prgname);
 
-    /* configure progressive download for selected media types */
-    g_object_set (queue, "temp-template", temp_template, NULL);
+      /* build our filename */
+      temp_template = g_build_filename (tmp_dir, filename, NULL);
 
-    g_free (filename);
-    g_free (temp_template);
-  } else {
-    if (is_adaptive) {
-      GST_LOG_OBJECT (urisrc, "Adding queue for adaptive streaming stream");
-      g_object_set (queue, "use-buffering", urisrc->use_buffering,
-          "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL);
+      GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
+          temp_template, tmp_dir, prgname, filename);
+
+      /* configure progressive download for selected media types */
+      g_object_set (queue, "temp-template", temp_template, NULL);
+
+      g_free (filename);
+      g_free (temp_template);
     } else {
-      GST_LOG_OBJECT (urisrc, "Adding queue for buffering");
-      g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
-    }
+      if (is_adaptive) {
+        GST_LOG_OBJECT (urisrc, "Adding queue2 for adaptive streaming stream");
+        g_object_set (queue, "use-buffering", urisrc->use_buffering,
+            "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL);
+      } else {
+        GST_LOG_OBJECT (urisrc, "Adding queue for buffering");
+        g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
+      }
 
-    g_object_set (queue, "ring-buffer-max-size",
-        urisrc->ring_buffer_max_size, NULL);
-    /* Disable max-size-buffers - queue based on data rate to the default time limit */
-    g_object_set (queue, "max-size-buffers", 0, NULL);
+      g_object_set (queue, "ring-buffer-max-size",
+          urisrc->ring_buffer_max_size, NULL);
+      /* Disable max-size-buffers - queue based on data rate to the default time limit */
+      g_object_set (queue, "max-size-buffers", 0, NULL);
 
-    /* Don't start buffering until the queue is empty (< 1%).
-     * Start playback when the queue is 60% full, leaving a bit more room
-     * for upstream to push more without getting bursty */
-    g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL);
+      /* Don't start buffering until the queue is empty (< 1%).
+       * Start playback when the queue is 60% full, leaving a bit more room
+       * for upstream to push more without getting bursty */
+      g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL);
 
-    g_object_set (queue, "low-watermark", urisrc->low_watermark,
-        "high-watermark", urisrc->high_watermark, NULL);
-  }
+      g_object_set (queue, "low-watermark", urisrc->low_watermark,
+          "high-watermark", urisrc->high_watermark, NULL);
+    }
 
-  /* set the necessary limits on the queue-like elements */
-  g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc),
-      "max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL);
-#if 0
-  /* Disabled because this makes initial startup slower for radio streams */
-  else {
-    /* Buffer 4 seconds by default - some extra headroom over the
-     * core default, because we trigger playback sooner */
-    //g_object_set (queue, "max-size-time", 4 * GST_SECOND, NULL);
-  }
-#endif
+    /* set the necessary limits on the queue-like elements */
+    g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc),
+        "max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL);
+
+    gst_bin_add (GST_BIN_CAST (urisrc), queue);
+    gst_element_sync_state_with_parent (queue);
+
+    slot->queue_sinkpad = gst_element_get_static_pad (queue, "sink");
 
-  /* save queue pointer so we can remove it later */
-  urisrc->out_slots = g_slist_prepend (urisrc->out_slots, slot);
+    /* get the new raw srcpad */
+    srcpad = gst_element_get_static_pad (queue, "src");
 
-  gst_bin_add (GST_BIN_CAST (urisrc), queue);
-  gst_element_sync_state_with_parent (queue);
+    slot->output_pad = create_output_pad (slot, srcpad);
 
-  slot->queue_sinkpad = gst_element_get_static_pad (queue, "sink");
+    gst_object_unref (srcpad);
 
-  /* get the new raw srcpad */
-  srcpad = gst_element_get_static_pad (queue, "src");
-  g_object_set_data (G_OBJECT (srcpad), "urisourcebin.slotinfo", slot);
+    gst_pad_link (originating_pad, slot->queue_sinkpad);
+  } else {
+    /* Expose pad directly */
+    slot->output_pad = create_output_pad (slot, originating_pad);
+  }
+  slot->originating_pad = gst_object_ref (originating_pad);
 
-  slot->output_pad = create_output_pad (urisrc, srcpad);
+  /* save output slot so we can remove it later */
+  info->outputs = g_list_append (info->outputs, slot);
 
-  gst_object_unref (srcpad);
+  GST_DEBUG_OBJECT (urisrc, "New slot for output_pad %" GST_PTR_FORMAT,
+      slot->output_pad);
 
   return slot;
 
 no_buffer_element:
   {
+    g_free (slot);
     post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), elem_name);
     return NULL;
   }
@@ -1022,15 +1091,19 @@ source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
     gpointer user_data)
 {
   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
-  GstURISourceBin *urisrc = user_data;
+  OutputSlotInfo *slot = user_data;
+  GstURISourceBin *urisrc = slot->linked_info->urisrc;
 
-  GST_LOG_OBJECT (pad, "%s, urisrc %p", GST_EVENT_TYPE_NAME (event), event);
+  GST_LOG_OBJECT (pad, "%" GST_PTR_FORMAT, event);
 
+  /* A custom EOS will be received if an adaptive demuxer source pad removed a
+   * pad and buffering was present on that slot */
   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
       gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (event),
           CUSTOM_EOS_QUARK)) {
-    OutputSlotInfo *slot;
-    GST_DEBUG_OBJECT (pad, "we received EOS");
+    GstPadProbeReturn probe_ret = GST_PAD_PROBE_DROP;
+
+    GST_DEBUG_OBJECT (pad, "we received custom EOS");
 
     /* remove custom-eos */
     gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event), CUSTOM_EOS_QUARK,
@@ -1038,41 +1111,19 @@ source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
 
     GST_URI_SOURCE_BIN_LOCK (urisrc);
 
-    slot = g_object_get_data (G_OBJECT (pad), "urisourcebin.slotinfo");
-
-    if (slot) {
-      GstEvent *eos;
-      guint32 seqnum;
-
-      if (slot->linked_info) {
-        if (slot->is_eos) {
-          /* linked_info is old input which is still linked without removal */
-          GST_DEBUG_OBJECT (pad, "push actual EOS");
-          seqnum = gst_event_get_seqnum (event);
-          eos = gst_event_new_eos ();
-          gst_event_set_seqnum (eos, seqnum);
-          gst_pad_push_event (slot->output_pad, eos);
-        } else {
-          /* Do not clear output slot yet. A new input was
-           * connected. We should just drop this EOS */
-        }
-        GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-        return GST_PAD_PROBE_DROP;
-      }
-
-      seqnum = gst_event_get_seqnum (event);
-      eos = gst_event_new_eos ();
-      gst_event_set_seqnum (eos, seqnum);
-      gst_pad_push_event (slot->output_pad, eos);
-      free_output_slot_async (urisrc, slot);
+    if (slot->is_eos) {
+      /* linked_info is old input which is still linked without removal */
+      GST_DEBUG_OBJECT (pad, "push actual EOS");
+      gst_pad_push_event (slot->output_pad, event);
+      probe_ret = GST_PAD_PROBE_HANDLED;
     }
 
-    /* FIXME: Only emit drained if all output pads are done and there's no
-     * pending pads */
-    g_signal_emit (urisrc, gst_uri_source_bin_signals[SIGNAL_DRAINED], 0, NULL);
+    /* And finally remove the output. This is done asynchronously since we can't
+     * do it from the streaming thread */
+    free_output_slot_async (urisrc, slot);
 
     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-    return GST_PAD_PROBE_DROP;
+    return probe_ret;
   }
   /* never drop events */
   return GST_PAD_PROBE_OK;
@@ -1082,14 +1133,17 @@ source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
  * padprobe to detect EOS before exposing the pad.
  * Called with LOCK held. */
 static GstPad *
-create_output_pad (GstURISourceBin * urisrc, GstPad * pad)
+create_output_pad (OutputSlotInfo * slot, GstPad * pad)
 {
+  GstURISourceBin *urisrc = slot->linked_info->urisrc;
   GstPad *newpad;
   GstPadTemplate *pad_tmpl;
   gchar *padname;
 
-  gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
-      source_pad_event_probe, urisrc, NULL);
+  /* If the output slot does buffering, add a probe to detect drainage */
+  if (slot->queue)
+    gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
+        source_pad_event_probe, slot, NULL);
 
   pad_tmpl = gst_static_pad_template_get (&srctemplate);
 
@@ -1162,97 +1216,61 @@ expose_output_pad (GstURISourceBin * urisrc, GstPad * pad)
 }
 
 static void
-expose_raw_output_pad (GstURISourceBin * urisrc, GstPad * srcpad,
-    GstPad * output_pad)
+demuxer_pad_removed_cb (GstElement * element, GstPad * pad,
+    ChildSrcPadInfo * info)
 {
-  ChildSrcPadInfo *info = g_new0 (ChildSrcPadInfo, 1);
-  info->src_pad = srcpad;
-  info->output_pad = gst_object_ref (output_pad);
-
-  g_assert (g_object_get_data (G_OBJECT (srcpad),
-          "urisourcebin.srcpadinfo") == NULL);
-
-  g_object_set_data_full (G_OBJECT (srcpad), "urisourcebin.srcpadinfo",
-      info, (GDestroyNotify) free_child_src_pad_info);
-
-  expose_output_pad (urisrc, output_pad);
-}
-
-static void
-remove_output_pad (GstURISourceBin * urisrc, GstPad * pad)
-{
-  if (!gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (urisrc)))
-    return;                     /* Pad is not exposed */
-
-  GST_DEBUG_OBJECT (urisrc, "Removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
-
-  gst_pad_set_active (pad, FALSE);
-  gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), pad);
-}
-
-static void
-pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
-{
-  ChildSrcPadInfo *info;
-
-  GST_DEBUG_OBJECT (element, "pad removed name: <%s:%s>",
-      GST_DEBUG_PAD_NAME (pad));
+  GstURISourceBin *urisrc;
+  OutputSlotInfo *slot;
 
   /* we only care about srcpads */
   if (!GST_PAD_IS_SRC (pad))
     return;
 
-  if (!(info = g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
-    goto no_info;
+  urisrc = info->urisrc;
+
+  GST_DEBUG_OBJECT (urisrc, "pad removed name: <%s:%s>",
+      GST_DEBUG_PAD_NAME (pad));
 
   GST_URI_SOURCE_BIN_LOCK (urisrc);
+  slot = output_slot_for_originating_pad (info, pad);
+  g_assert (slot);
 
-  /* Send EOS to the output slot if the demuxer didn't already */
-  if (info->output_slot) {
-    GstStructure *s;
-    GstEvent *event;
-    OutputSlotInfo *slot;
+  gst_pad_remove_probe (pad, slot->demuxer_event_probe_id);
+  slot->demuxer_event_probe_id = 0;
 
-    slot = info->output_slot;
+  if (slot->queue) {
+    gboolean was_eos;
+
+    /* Propagate custom EOS to buffering elements. The slot will be removed when
+     * it is received on the output of the buffering elements */
 
     BUFFERING_LOCK (urisrc);
     /* Unlink this pad from its output slot and send a fake EOS event
      * to drain the queue */
+    was_eos = slot->is_eos;
     slot->is_eos = TRUE;
     BUFFERING_UNLOCK (urisrc);
 
     remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
-
-    slot->linked_info = NULL;
-
-    info->output_slot = NULL;
-
-    GST_LOG_OBJECT (element,
-        "Pad %" GST_PTR_FORMAT " was removed without EOS. Sending.", pad);
-
-    event = gst_event_new_eos ();
-    s = gst_event_writable_structure (event);
-    gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE,
-        NULL);
-    gst_pad_send_event (slot->queue_sinkpad, event);
-  } else if (info->output_pad != NULL) {
-    GST_LOG_OBJECT (element,
-        "Pad %" GST_PTR_FORMAT " was removed. Unexposing %" GST_PTR_FORMAT,
-        pad, info->output_pad);
-    remove_output_pad (urisrc, info->output_pad);
+    if (!was_eos) {
+      GstStructure *s;
+      GstEvent *event;
+      event = gst_event_new_eos ();
+      s = gst_event_writable_structure (event);
+      gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE,
+          NULL);
+      gst_pad_send_event (slot->queue_sinkpad, event);
+    }
   } else {
-    GST_WARNING_OBJECT (urisrc, "Removed pad has no output slot or pad");
+    GST_LOG_OBJECT (urisrc,
+        "No buffering involved, removing output slot immediately");
+    /* Remove output slot immediately */
+    info->outputs = g_list_remove (info->outputs, slot);
+    free_output_slot (slot, urisrc);
   }
   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
 
   return;
-
-  /* ERRORS */
-no_info:
-  {
-    GST_WARNING_OBJECT (element, "no info found for pad");
-    return;
-  }
 }
 
 /* helper function to lookup stuff in lists */
@@ -1488,6 +1506,80 @@ post_missing_plugin_error (GstElement * urisrc, const gchar * element_name)
           element_name), (NULL));
 }
 
+typedef struct
+{
+  GstURISourceBin *urisrc;
+  gboolean have_out;
+  gboolean res;
+} AnalyseData;
+
+static void
+analyse_pad_foreach (const GValue * item, AnalyseData * data)
+{
+  GstURISourceBin *urisrc = data->urisrc;
+  GstPad *pad = g_value_dup_object (item);
+  ChildSrcPadInfo *info;
+  GstCaps *padcaps = NULL;
+  gboolean pad_is_raw;
+  gboolean res = TRUE;
+
+  GST_LOG_OBJECT (urisrc, "pad %" GST_PTR_FORMAT, pad);
+
+  data->have_out = TRUE;
+
+  /* The info might already exist if there was an iterator resync */
+  if (get_cspi_for_pad (urisrc, pad)) {
+    GST_LOG_OBJECT (urisrc, "Already analysed");
+    goto out;
+  }
+
+  info = new_child_src_pad_info (urisrc, pad);
+  padcaps = gst_pad_query_caps (pad, NULL);
+
+  if (!is_all_raw_caps (padcaps, DEFAULT_CAPS, &pad_is_raw) || !pad_is_raw) {
+    /* if FALSE, this pad has no caps, we setup typefinding on it */
+    if (!setup_typefind (info)) {
+      res = FALSE;
+      goto out;
+    }
+  } else if (pad_is_raw) {
+    /* caps on source pad are all raw, we can add the pad */
+    GstPad *output_pad;
+    OutputSlotInfo *slot;
+
+    GST_URI_SOURCE_BIN_LOCK (urisrc);
+    /* Only use buffering on raw pads in very specific conditions */
+    GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d",
+        urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri));
+    slot = new_output_slot (info, FALSE, FALSE, !urisrc->use_buffering
+        || !IS_QUEUE_URI (urisrc->uri), pad);
+
+    if (!slot) {
+      res = FALSE;
+      GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+      goto out;
+    }
+
+    /* get the new raw srcpad */
+    output_pad = gst_object_ref (slot->output_pad);
+
+    GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+
+    expose_output_pad (urisrc, output_pad);
+    gst_object_unref (output_pad);
+  } else {
+    GST_DEBUG_OBJECT (urisrc, "Handling non-raw pad");
+    /* The caps are non-raw, we handle it directly */
+    handle_new_pad (info, pad, padcaps);
+  }
+
+out:
+  if (padcaps)
+    gst_caps_unref (padcaps);
+  gst_object_unref (pad);
+  data->res &= res;
+}
+
 /**
  * analyse_source_and_expose_raw_pads:
  * @urisrc: a #GstURISourceBin
@@ -1514,108 +1606,35 @@ post_missing_plugin_error (GstElement * urisrc, const gchar * element_name)
  */
 static gboolean
 analyse_source_and_expose_raw_pads (GstURISourceBin * urisrc,
-    gboolean * all_pads_raw, gboolean * have_out, gboolean * is_dynamic)
+    gboolean * have_out, gboolean * is_dynamic)
 {
   GstElementClass *elemclass;
+  AnalyseData data = { 0, };
+  GstIteratorResult iterres;
   GList *walk;
   GstIterator *pads_iter;
-  gboolean done = FALSE;
   gboolean res = TRUE;
-  GstPad *pad;
-  GValue item = { 0, };
-  guint nb_raw = 0;
-  guint nb_pads = 0;
-  GstCaps *rawcaps = DEFAULT_CAPS;
-  gboolean pad_is_raw;
-  gboolean use_queue;
-
-  *have_out = FALSE;
-  *all_pads_raw = FALSE;
-  *is_dynamic = FALSE;
-
-  /* Add buffering elements on raw pads only is very specific conditions */
-  use_queue = urisrc->use_buffering && IS_QUEUE_URI (urisrc->uri);
 
   pads_iter = gst_element_iterate_src_pads (urisrc->source);
-  while (!done) {
-    switch (gst_iterator_next (pads_iter, &item)) {
-      case GST_ITERATOR_ERROR:
-        res = FALSE;
-        /* FALLTHROUGH */
-      case GST_ITERATOR_DONE:
-        done = TRUE;
-        break;
-      case GST_ITERATOR_RESYNC:
-        /* reset results and resync */
-        *have_out = FALSE;
-        *all_pads_raw = FALSE;
-        *is_dynamic = FALSE;
-        nb_pads = nb_raw = 0;
-        gst_iterator_resync (pads_iter);
-        break;
-      case GST_ITERATOR_OK:
-      {
-        GstCaps *padcaps;
-
-        pad = g_value_dup_object (&item);
-        /* we now officially have an output pad */
-        *have_out = TRUE;
-        nb_pads++;
-        padcaps = gst_pad_query_caps (pad, NULL);
-
-        if (!is_all_raw_caps (padcaps, rawcaps, &pad_is_raw)) {
-          /* if FALSE, this pad has no caps, we setup typefinding on it */
-          if (!setup_typefind (urisrc, pad)) {
-            res = FALSE;
-            done = TRUE;
-          }
-        } else if (pad_is_raw) {
-          /* caps on source pad are all raw, we can add the pad */
-          GstPad *output_pad;
-
-          nb_raw++;
-          GST_URI_SOURCE_BIN_LOCK (urisrc);
-          if (use_queue) {
-            OutputSlotInfo *slot = get_output_slot (urisrc, FALSE, FALSE);
-            if (!slot) {
-              gst_caps_unref (padcaps);
-              gst_object_unref (pad);
-              goto no_slot;
-            }
-
-            gst_pad_link (pad, slot->queue_sinkpad);
-
-            /* get the new raw srcpad */
-            output_pad = gst_object_ref (slot->output_pad);
-
-            GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-
-            expose_output_pad (urisrc, output_pad);
-            gst_object_unref (output_pad);
-          } else {
-            output_pad = create_output_pad (urisrc, pad);
-
-            GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-
-            expose_raw_output_pad (urisrc, pad, output_pad);
-          }
-        } else {
-          /* The caps are non-raw, we handle it directly */
-          handle_new_pad (urisrc, pad, padcaps);
-        }
-        gst_caps_unref (padcaps);
-        gst_object_unref (pad);
-        g_value_reset (&item);
-        break;
-      }
-    }
-  }
-  g_value_unset (&item);
+
+restart:
+  data.res = TRUE;
+  data.have_out = FALSE;
+  data.urisrc = urisrc;
+  iterres =
+      gst_iterator_foreach (pads_iter,
+      (GstIteratorForeachFunction) analyse_pad_foreach, &data);
+  if (iterres == GST_ITERATOR_RESYNC)
+    goto restart;
+  if (iterres == GST_ITERATOR_ERROR)
+    res = FALSE;
+  else
+    res = data.res;
   gst_iterator_free (pads_iter);
-  gst_caps_unref (rawcaps);
 
   /* check for padtemplates that list SOMETIMES pads to
    * determine if the element is dynamic. */
+  *is_dynamic = FALSE;
   elemclass = GST_ELEMENT_GET_CLASS (urisrc->source);
   walk = gst_element_class_get_pad_template_list (elemclass);
   while (walk != NULL) {
@@ -1630,20 +1649,9 @@ analyse_source_and_expose_raw_pads (GstURISourceBin * urisrc,
     walk = g_list_next (walk);
   }
 
-  if (nb_pads && nb_pads == nb_raw)
-    *all_pads_raw = TRUE;
+  *have_out = data.have_out;
 
   return res;
-no_slot:
-  {
-    GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-    gst_object_unref (pad);
-    g_value_unset (&item);
-    gst_iterator_free (pads_iter);
-    gst_caps_unref (rawcaps);
-
-    return FALSE;
-  }
 }
 
 /* Remove any adaptive demuxer element */
@@ -1661,7 +1669,7 @@ remove_demuxer (GstURISourceBin * bin)
 
 /* make a demuxer and connect to all the signals */
 static GstElement *
-make_demuxer (GstURISourceBin * urisrc, GstCaps * caps)
+make_demuxer (GstURISourceBin * urisrc, ChildSrcPadInfo * info, GstCaps * caps)
 {
   GList *factories, *eligible, *cur;
   GstElement *demuxer = NULL;
@@ -1712,9 +1720,9 @@ make_demuxer (GstURISourceBin * urisrc, GstCaps * caps)
   /* set up callbacks to create the links between
    * demuxer streams and output */
   g_signal_connect (demuxer,
-      "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), urisrc);
+      "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info);
   g_signal_connect (demuxer,
-      "pad-removed", G_CALLBACK (pad_removed_cb), urisrc);
+      "pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info);
 
   /* Propagate connection-speed property */
   pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (demuxer),
@@ -1740,8 +1748,9 @@ no_demuxer:
  * * typefind has found a type
  */
 static void
-handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
+handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
 {
+  GstURISourceBin *urisrc = info->urisrc;
   gboolean is_raw;
   GstStructure *s;
   const gchar *media_type;
@@ -1751,14 +1760,17 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
 
   /* if this is a pad with all raw caps, we can expose it */
   if (is_all_raw_caps (caps, DEFAULT_CAPS, &is_raw) && is_raw) {
+    OutputSlotInfo *slot;
     GstPad *output_pad;
 
     GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT
         ", exposing", caps);
-    output_pad = create_output_pad (urisrc, srcpad);
+    slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
+    output_pad = gst_object_ref (slot->output_pad);
     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
 
-    expose_raw_output_pad (urisrc, srcpad, output_pad);
+    expose_output_pad (urisrc, slot->output_pad);
+    gst_object_unref (output_pad);
     return;
   }
   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
@@ -1773,7 +1785,7 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
     GstPadLinkReturn link_res;
     GstQuery *query;
 
-    urisrc->demuxer = make_demuxer (urisrc, caps);
+    urisrc->demuxer = make_demuxer (urisrc, info, caps);
     if (!urisrc->demuxer)
       goto no_demuxer;
     gst_bin_add (GST_BIN_CAST (urisrc), urisrc->demuxer);
@@ -1798,12 +1810,16 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
 
     gst_element_sync_state_with_parent (urisrc->demuxer);
   } else if (!urisrc->is_stream) {
+    OutputSlotInfo *slot;
     GstPad *output_pad;
-    /* We don't need slot here, expose immediately */
+
+    /* We don't need buffering here, expose immediately */
     GST_URI_SOURCE_BIN_LOCK (urisrc);
-    output_pad = create_output_pad (urisrc, srcpad);
+    slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
+    output_pad = gst_object_ref (slot->output_pad);
     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-    expose_raw_output_pad (urisrc, srcpad, output_pad);
+    expose_output_pad (urisrc, output_pad);
+    gst_object_unref (output_pad);
   } else {
     OutputSlotInfo *slot;
     GstPad *output_pad;
@@ -1823,11 +1839,7 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
         do_download);
 
     GST_URI_SOURCE_BIN_LOCK (urisrc);
-    slot = get_output_slot (urisrc, do_download, FALSE);
-
-    if (slot == NULL
-        || gst_pad_link (srcpad, slot->queue_sinkpad) != GST_PAD_LINK_OK)
-      goto could_not_link;
+    slot = new_output_slot (info, do_download, FALSE, FALSE, srcpad);
 
     gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
         pre_queue_event_probe, urisrc, NULL);
@@ -1866,13 +1878,14 @@ could_not_link:
  * buffering or regular buffering */
 static void
 type_found (GstElement * typefind, guint probability,
-    GstCaps * caps, GstURISourceBin * urisrc)
+    GstCaps * caps, ChildSrcPadInfo * info)
 {
+  GstURISourceBin *urisrc = info->urisrc;
   GstPad *srcpad = gst_element_get_static_pad (typefind, "src");
 
   GST_DEBUG_OBJECT (urisrc, "typefind found caps %" GST_PTR_FORMAT
       " on pad %" GST_PTR_FORMAT, caps, srcpad);
-  handle_new_pad (urisrc, srcpad, caps);
+  handle_new_pad (info, srcpad, caps);
 
   gst_object_unref (GST_OBJECT (srcpad));
 }
@@ -1881,42 +1894,33 @@ type_found (GstElement * typefind, guint probability,
  * source. After we find the type, we decide to whether to plug an adaptive
  * demuxer, or just link through queue2 (if needed) and expose the data */
 static gboolean
-setup_typefind (GstURISourceBin * urisrc, GstPad * srcpad)
+setup_typefind (ChildSrcPadInfo * info)
 {
-  GstElement *typefind;
+  GstURISourceBin *urisrc = info->urisrc;
+  GstPad *sinkpad;
 
   /* now create the typefind element */
-  typefind = gst_element_factory_make ("typefind", NULL);
-  if (!typefind)
+  info->typefind = gst_element_factory_make ("typefind", NULL);
+  if (!info->typefind)
     goto no_typefind;
 
   /* Make sure the bin doesn't set the typefind running yet */
-  gst_element_set_locked_state (typefind, TRUE);
-
-  gst_bin_add (GST_BIN_CAST (urisrc), typefind);
-
-  if (!srcpad) {
-    if (!gst_element_link_pads (urisrc->source, NULL, typefind, "sink"))
-      goto could_not_link;
-  } else {
-    GstPad *sinkpad = gst_element_get_static_pad (typefind, "sink");
-    GstPadLinkReturn ret;
+  gst_element_set_locked_state (info->typefind, TRUE);
 
-    ret = gst_pad_link (srcpad, sinkpad);
-    gst_object_unref (sinkpad);
-    if (ret != GST_PAD_LINK_OK)
-      goto could_not_link;
-  }
+  gst_bin_add (GST_BIN_CAST (urisrc), info->typefind);
 
-  urisrc->typefinds = g_list_append (urisrc->typefinds, typefind);
+  sinkpad = gst_element_get_static_pad (info->typefind, "sink");
+  if (gst_pad_link (info->src_pad, sinkpad) != GST_PAD_LINK_OK)
+    goto could_not_link;
+  gst_object_unref (sinkpad);
 
   /* connect a signal to find out when the typefind element found
    * a type */
-  g_signal_connect (typefind, "have-type", G_CALLBACK (type_found), urisrc);
+  g_signal_connect (info->typefind, "have-type", G_CALLBACK (type_found), info);
 
   /* Now it can start */
-  gst_element_set_locked_state (typefind, FALSE);
-  gst_element_sync_state_with_parent (typefind);
+  gst_element_set_locked_state (info->typefind, FALSE);
+  gst_element_sync_state_with_parent (info->typefind);
 
   return TRUE;
 
@@ -1930,29 +1934,38 @@ no_typefind:
   }
 could_not_link:
   {
+    gst_object_unref (sinkpad);
     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
         (NULL), ("Can't link source to typefind element"));
-    gst_bin_remove (GST_BIN_CAST (urisrc), typefind);
     return FALSE;
   }
 }
 
+/* CALL WITH URISOURCEBIN LOCK */
 static void
 free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
 {
-  GST_DEBUG_OBJECT (urisrc, "removing old queue element and freeing slot %p",
-      slot);
-  if (slot->bitrate_changed_id > 0)
-    g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id);
-  slot->bitrate_changed_id = 0;
+  GST_DEBUG_OBJECT (urisrc,
+      "removing output slot %" GST_PTR_FORMAT " -> %" GST_PTR_FORMAT,
+      slot->originating_pad, slot->output_pad);
+
+  if (slot->queue) {
+    if (slot->bitrate_changed_id > 0)
+      g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id);
+    slot->bitrate_changed_id = 0;
+
+    gst_element_set_locked_state (slot->queue, TRUE);
+    gst_element_set_state (slot->queue, GST_STATE_NULL);
+    remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
+    gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
 
-  gst_element_set_locked_state (slot->queue, TRUE);
-  gst_element_set_state (slot->queue, GST_STATE_NULL);
-  remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
-  gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
+    gst_object_unref (slot->queue_sinkpad);
+  }
 
-  gst_object_unref (slot->queue_sinkpad);
+  if (slot->demuxer_event_probe_id)
+    gst_pad_remove_probe (slot->originating_pad, slot->demuxer_event_probe_id);
 
+  gst_object_unref (slot->originating_pad);
   /* deactivate and remove the srcpad */
   gst_pad_set_active (slot->output_pad, FALSE);
   gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), slot->output_pad);
@@ -1972,41 +1985,11 @@ static void
 free_output_slot_async (GstURISourceBin * urisrc, OutputSlotInfo * slot)
 {
   GST_LOG_OBJECT (urisrc, "pushing output slot on thread pool to free");
-  urisrc->out_slots = g_slist_remove (urisrc->out_slots, slot);
+  slot->linked_info->outputs = g_list_remove (slot->linked_info->outputs, slot);
   gst_element_call_async (GST_ELEMENT_CAST (urisrc),
       (GstElementCallAsyncFunc) call_free_output_slot, slot, NULL);
 }
 
-static void
-unexpose_raw_pad_func (const GValue * item, GstURISourceBin * urisrc)
-{
-  GstPad *pad = g_value_get_object (item);
-  ChildSrcPadInfo *info =
-      g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo");
-
-  if (info && info->output_pad != NULL)
-    remove_output_pad (urisrc, info->output_pad);
-}
-
-static void
-unexpose_src_pads (GstURISourceBin * urisrc, GstElement * element)
-{
-  GstIterator *pads_iter;
-
-  pads_iter = gst_element_iterate_src_pads (element);
-  gst_iterator_foreach (pads_iter,
-      (GstIteratorForeachFunction) unexpose_raw_pad_func, urisrc);
-  gst_iterator_free (pads_iter);
-}
-
-static void
-remove_typefind (GstElement * typefind, GstURISourceBin * urisrc)
-{
-  unexpose_src_pads (urisrc, typefind);
-  gst_element_set_state (typefind, GST_STATE_NULL);
-  gst_bin_remove (GST_BIN_CAST (urisrc), typefind);
-}
-
 /* remove source and all related elements */
 static void
 remove_source (GstURISourceBin * urisrc)
@@ -2015,7 +1998,6 @@ remove_source (GstURISourceBin * urisrc)
     GstElement *source = urisrc->source;
 
     GST_DEBUG_OBJECT (urisrc, "removing old src element");
-    unexpose_src_pads (urisrc, source);
     gst_element_set_state (source, GST_STATE_NULL);
 
     if (urisrc->src_np_sig_id) {
@@ -2026,17 +2008,12 @@ remove_source (GstURISourceBin * urisrc)
     urisrc->source = NULL;
   }
 
-  if (urisrc->typefinds) {
-    GST_DEBUG_OBJECT (urisrc, "removing old typefind elements");
-    g_list_foreach (urisrc->typefinds, (GFunc) remove_typefind, urisrc);
-    g_list_free (urisrc->typefinds);
-    urisrc->typefinds = NULL;
-  }
-
   GST_URI_SOURCE_BIN_LOCK (urisrc);
-  g_slist_foreach (urisrc->out_slots, (GFunc) free_output_slot, urisrc);
-  g_slist_free (urisrc->out_slots);
-  urisrc->out_slots = NULL;
+  if (urisrc->src_infos) {
+    g_list_foreach (urisrc->src_infos, (GFunc) free_child_src_pad_info, urisrc);
+    g_list_free (urisrc->src_infos);
+    urisrc->src_infos = NULL;
+  }
   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
 
   if (urisrc->demuxer)
@@ -2048,14 +2025,17 @@ static void
 source_new_pad (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
 {
   GstCaps *caps;
+  ChildSrcPadInfo *info = new_child_src_pad_info (urisrc, pad);
 
   GST_DEBUG_OBJECT (urisrc, "Found new pad %s.%s in source element %s",
       GST_DEBUG_PAD_NAME (pad), GST_ELEMENT_NAME (element));
+
   caps = gst_pad_get_current_caps (pad);
+  GST_DEBUG_OBJECT (urisrc, "caps %" GST_PTR_FORMAT, caps);
   if (caps == NULL)
-    setup_typefind (urisrc, pad);
+    setup_typefind (info);
   else {
-    handle_new_pad (urisrc, pad, caps);
+    handle_new_pad (info, pad, caps);
     gst_caps_unref (caps);
   }
 }
@@ -2066,7 +2046,7 @@ source_new_pad (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
 static gboolean
 setup_source (GstURISourceBin * urisrc)
 {
-  gboolean all_pads_raw, have_out, is_dynamic;
+  gboolean have_out, is_dynamic;
 
   GST_DEBUG_OBJECT (urisrc, "setup source");
 
@@ -2088,18 +2068,10 @@ setup_source (GstURISourceBin * urisrc)
    * if so, we can create streams for the pads and be done with it.
    * Also check that is has source pads, if not, we assume it will
    * do everything itself.  */
-  if (!analyse_source_and_expose_raw_pads (urisrc, &all_pads_raw, &have_out,
-          &is_dynamic))
+  if (!analyse_source_and_expose_raw_pads (urisrc, &have_out, &is_dynamic))
     goto invalid_source;
 
   if (!is_dynamic) {
-    if (all_pads_raw) {
-      GST_DEBUG_OBJECT (urisrc, "Source provides all raw data");
-      /* source provides raw data, we added the pads and we can now signal a
-       * no_more pads because we are done. */
-      gst_element_no_more_pads (GST_ELEMENT_CAST (urisrc));
-      return TRUE;
-    }
     if (!have_out)
       goto no_pads;
   } else {
@@ -2221,6 +2193,24 @@ handle_redirect_message (GstURISourceBin * urisrc, GstMessage * msg)
   return new_msg;
 }
 
+/* CALL WITH URISOURCEBIN LOCK */
+static OutputSlotInfo *
+output_slot_for_buffering_element (GstURISourceBin * urisrc,
+    GstElement * element)
+{
+  GList *top, *iter;
+  for (top = urisrc->src_infos; top; top = top->next) {
+    ChildSrcPadInfo *info = top->data;
+    for (iter = info->outputs; iter; iter = iter->next) {
+      OutputSlotInfo *slot = iter->data;
+      if (slot->queue == element)
+        return slot;
+    }
+  }
+
+  return NULL;
+}
+
 static void
 handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
 {
@@ -2231,9 +2221,9 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
   GList *iter;
   OutputSlotInfo *slot;
 
-  /* buffering messages must be aggregated as there might be multiple
-   * multiqueue in the pipeline and their independent buffering messages
-   * will confuse the application
+  /* buffering messages must be aggregated as there might be multiple buffering
+   * elements in the pipeline and their independent buffering messages will
+   * confuse the application
    *
    * urisourcebin keeps a list of messages received from elements that are
    * buffering.
@@ -2249,10 +2239,10 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
   GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT
       " with %d%%", GST_MESSAGE_SRC (msg), msg_perc);
 
-  slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (msg)),
-      "urisourcebin.slotinfo");
-
   BUFFERING_LOCK (urisrc);
+  slot =
+      output_slot_for_buffering_element (urisrc,
+      (GstElement *) GST_MESSAGE_SRC (msg));
   if (slot && slot->is_eos) {
     /* Ignore buffering messages from queues we marked as EOS,
      * we already removed those from the list of buffering
@@ -2275,8 +2265,9 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
     GstMessage *bufstats = iter->data;
     gboolean is_eos = FALSE;
 
-    slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
-        "urisourcebin.slotinfo");
+    slot =
+        output_slot_for_buffering_element (urisrc,
+        (GstElement *) GST_MESSAGE_SRC (msg));
     if (slot)
       is_eos = slot->is_eos;
 
@@ -2403,12 +2394,6 @@ handle_message (GstBin * bin, GstMessage * msg)
       }
       break;
     }
-    case GST_MESSAGE_STREAM_COLLECTION:
-    {
-      GST_DEBUG_OBJECT (urisrc, "Source is streams-aware");
-      urisrc->source_streams_aware = TRUE;
-      break;
-    }
     case GST_MESSAGE_BUFFERING:
       handle_buffering_message (urisrc, msg);
       msg = NULL;
@@ -2749,7 +2734,6 @@ gst_uri_source_bin_change_state (GstElement * element,
           (GDestroyNotify) gst_message_unref);
       urisrc->buffering_status = NULL;
       urisrc->last_buffering_pct = -1;
-      urisrc->source_streams_aware = FALSE;
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       GST_DEBUG ("ready to null");