adaptivedemux2: Don't leak caps in debug statements
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux-stream.c
index 38f36db..a7949be 100644 (file)
@@ -27,7 +27,7 @@
 #include "config.h"
 #endif
 
-#include "gstadaptivedemux.h"
+#include "gstadaptivedemux-stream.h"
 #include "gstadaptivedemux-private.h"
 
 #include <glib/gi18n-lib.h>
@@ -38,6 +38,18 @@ GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
 
 static void gst_adaptive_demux2_stream_finalize (GObject * object);
 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
+static GstFlowReturn
+gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux2Stream *
+    stream, GstBuffer * buffer);
+static GstFlowReturn
+gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux2Stream *
+    stream);
+
+guint64
+gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux2Stream *
+    stream);
+static void gst_adaptive_demux2_stream_update_track_ids (GstAdaptiveDemux2Stream
+    * stream);
 
 #define gst_adaptive_demux2_stream_parent_class parent_class
 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
@@ -49,6 +61,9 @@ gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
   GObjectClass *gobject_class = (GObjectClass *) klass;
 
   gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
+
+  klass->data_received = gst_adaptive_demux2_stream_data_received_default;
+  klass->finish_fragment = gst_adaptive_demux2_stream_finish_fragment_default;
 }
 
 static GType tsdemux_type = 0;
@@ -59,10 +74,13 @@ gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
   stream->download_request = download_request_new ();
   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
   stream->last_ret = GST_FLOW_OK;
+  stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
 
   stream->fragment_bitrates =
       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
 
+  stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
+
   gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
 }
 
@@ -82,7 +100,6 @@ gst_adaptive_demux2_stream_finalize (GObject * object)
   if (stream->download_request)
     download_request_unref (stream->download_request);
 
-  stream->cancelled = TRUE;
   g_clear_error (&stream->last_error);
 
   gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
@@ -102,6 +119,14 @@ gst_adaptive_demux2_stream_finalize (GObject * object)
   if (stream->pad_removed_id)
     g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
 
+  if (stream->parsebin != NULL) {
+    GST_LOG_OBJECT (stream, "Removing parsebin");
+    gst_bin_remove (GST_BIN_CAST (stream->demux), stream->parsebin);
+    gst_element_set_state (stream->parsebin, GST_STATE_NULL);
+    gst_object_unref (stream->parsebin);
+    stream->parsebin = NULL;
+  }
+
   g_free (stream->fragment_bitrates);
 
   g_list_free_full (stream->tracks,
@@ -110,7 +135,7 @@ gst_adaptive_demux2_stream_finalize (GObject * object)
   if (stream->pending_caps)
     gst_caps_unref (stream->pending_caps);
 
-  g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
+  gst_clear_tag_list (&stream->pending_tags);
   g_clear_pointer (&stream->stream_collection, gst_object_unref);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
@@ -138,6 +163,19 @@ gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
     return FALSE;
   }
 
+  if (stream->demux->buffering_low_watermark_time)
+    track->buffering_threshold = stream->demux->buffering_low_watermark_time;
+  else if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
+    track->buffering_threshold =
+        MIN (10 * GST_SECOND, stream->recommended_buffering_threshold);
+  else {
+    /* Using a starting default, can be overriden later in
+     * ::update_stream_info() */
+    GST_DEBUG_OBJECT (stream,
+        "Setting default 10s buffering threshold on new track");
+    track->buffering_threshold = 10 * GST_SECOND;
+  }
+
   stream->tracks =
       g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
   if (stream->demux) {
@@ -232,10 +270,10 @@ schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
 }
 
 static void
-drain_inactive_tracks (GstAdaptiveDemux * demux,
-    GstAdaptiveDemux2Stream * stream)
+drain_inactive_tracks (GstAdaptiveDemux2Stream * stream)
 {
   GList *iter;
+  GstAdaptiveDemux *demux = stream->demux;
 
   TRACKS_LOCK (demux);
   for (iter = stream->tracks; iter; iter = iter->next) {
@@ -255,8 +293,8 @@ static void
 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
     stream, GstFlowReturn ret, GError * err)
 {
-  GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
-  GstAdaptiveDemux *demux = stream->demux;
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
 
   GST_DEBUG_OBJECT (stream,
       "%s download finish: %d %s - err: %p", uritype (stream), ret,
@@ -287,10 +325,15 @@ gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
   }
 
   /* Handle all the possible flow returns here: */
-  if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
+  if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC) {
+    /* We lost sync, seek back to live and return */
+    GST_WARNING_OBJECT (stream, "Lost sync when downloading");
+    gst_adaptive_demux_handle_lost_sync (stream->demux);
+    return;
+  } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
     /* The sub-class wants to stop the fragment immediately */
     stream->fragment.finished = TRUE;
-    ret = klass->finish_fragment (demux, stream);
+    ret = klass->finish_fragment (stream);
 
     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
         gst_flow_get_name (ret));
@@ -303,7 +346,7 @@ gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
       || !klass->need_another_chunk (stream)
       || stream->fragment.chunk_size == 0) {
     stream->fragment.finished = TRUE;
-    ret = klass->finish_fragment (stream->demux, stream);
+    ret = klass->finish_fragment (stream);
 
     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
         gst_flow_get_name (ret));
@@ -315,7 +358,7 @@ gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
 
   /* For HLS, we might be enqueueing data into tracks that aren't
    * selected. Drain those ones out */
-  drain_inactive_tracks (stream->demux, stream);
+  drain_inactive_tracks (stream);
 
   /* Now that we've called finish_fragment we can clear these flags the
    * sub-class might have checked */
@@ -359,7 +402,7 @@ gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
 
   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
   stream->pending_cb_id =
-      gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
+      gst_adaptive_demux_loop_call (stream->demux->priv->scheduler_task,
       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
 }
@@ -386,13 +429,16 @@ gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
 }
 
 static void
-gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
-    GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
+gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux2Stream * stream,
+    gboolean first_and_live)
 {
+  GstAdaptiveDemux *demux = stream->demux;
   GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
   GstClockTime offset =
-      gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
+      gst_adaptive_demux2_stream_get_presentation_offset (stream);
 
+  /* FIXME: Add a helper function to retrieve the demuxer segment
+   * using the SEGMENT_LOCK */
   stream->parse_segment = demux->segment;
 
   /* The demuxer segment is just built from seek events, but for each stream
@@ -450,8 +496,9 @@ gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
    * the segment time and base as calculated by the second case would be
    * equivalent.
    */
-  GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
-      &demux->segment);
+  GST_DEBUG_OBJECT (stream, "Using demux segment %" GST_SEGMENT_FORMAT,
+      &stream->parse_segment);
+
   GST_DEBUG_OBJECT (demux,
       "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
       GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
@@ -514,7 +561,7 @@ update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
 
   if (GST_CLOCK_STIME_IS_VALID (pos)) {
     GstClockTime offset =
-        gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
+        gst_adaptive_demux2_stream_get_presentation_offset (stream);
 
     pos += offset;
 
@@ -546,8 +593,7 @@ gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
   GList *pending_events = NULL;
 
   if (stream->compute_segment) {
-    gst_adaptive_demux2_stream_prepare_segment (demux, stream,
-        stream->first_and_live);
+    gst_adaptive_demux2_stream_prepare_segment (stream, stream->first_and_live);
     stream->compute_segment = FALSE;
     stream->first_and_live = FALSE;
   }
@@ -668,7 +714,7 @@ gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
     gst_pad_send_event (stream->parsebin_sink, buffer_gap);
   }
 
-  if (G_UNLIKELY (stream->cancelled)) {
+  if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
     GST_LOG_OBJECT (demux, "Stream was cancelled");
     return GST_FLOW_FLUSHING;
   }
@@ -683,11 +729,13 @@ gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
     GstBuffer * buffer)
 {
   GstAdaptiveDemux *demux = stream->demux;
-  GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
   GstFlowReturn ret = GST_FLOW_OK;
 
   /* do not make any changes if the stream is cancelled */
-  if (G_UNLIKELY (stream->cancelled)) {
+  if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
+    GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
     gst_buffer_unref (buffer);
     return GST_FLOW_FLUSHING;
   }
@@ -699,7 +747,7 @@ gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
    * including the *actual* fragment ! */
   if (stream->starting_fragment) {
     stream->starting_fragment = FALSE;
-    if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
+    if (klass->start_fragment != NULL && !klass->start_fragment (stream))
       return GST_FLOW_ERROR;
   }
 
@@ -709,7 +757,7 @@ gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
       "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
       gst_buffer_get_size (buffer));
 
-  ret = klass->data_received (demux, stream, buffer);
+  ret = klass->data_received (stream, buffer);
 
   if (ret != GST_FLOW_OK) {
     GST_DEBUG_OBJECT (stream, "data_received returned %s",
@@ -717,7 +765,8 @@ gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
 
     if (ret == GST_FLOW_FLUSHING) {
       /* do not make any changes if the stream is cancelled */
-      if (G_UNLIKELY (stream->cancelled)) {
+      if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED) {
+        GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
         return ret;
       }
     }
@@ -729,7 +778,7 @@ gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
       GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
 
       /* TODO push this on all pads */
-      gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
+      gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
       gst_pad_send_event (stream->parsebin_sink, eos);
       ret = GST_FLOW_ERROR;
 
@@ -746,6 +795,7 @@ gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
  */
 static void
 calculate_track_thresholds (GstAdaptiveDemux * demux,
+    GstAdaptiveDemux2Stream * stream,
     GstClockTime fragment_duration, GstClockTime * low_threshold,
     GstClockTime * high_threshold)
 {
@@ -757,6 +807,15 @@ calculate_track_thresholds (GstAdaptiveDemux * demux,
     *low_threshold = demux->buffering_low_watermark_time;
   }
 
+  if (*low_threshold == 0) {
+    /* This implies both low level properties were 0, the default is 10s unless
+     * the subclass has specified a recommended buffering threshold */
+    *low_threshold = 10 * GST_SECOND;
+    if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
+      *low_threshold =
+          MIN (stream->recommended_buffering_threshold, *low_threshold);
+  }
+
   *high_threshold =
       demux->buffering_high_watermark_fragments * fragment_duration;
   if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
@@ -784,9 +843,11 @@ calculate_track_thresholds (GstAdaptiveDemux * demux,
       (*low_threshold != 0 && *low_threshold > *high_threshold)) {
     *high_threshold = *low_threshold;
   }
+
   GST_OBJECT_UNLOCK (demux);
 }
 
+#define ABSDIFF(a,b) ((a) < (b) ? (b) - (a) : (a) - (b))
 static gboolean
 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
     GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
@@ -800,8 +861,13 @@ gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
   GstClockTime low_threshold = 0, high_threshold = 0;
   GList *iter;
 
-  calculate_track_thresholds (demux, fragment_duration,
+  calculate_track_thresholds (demux, stream, fragment_duration,
       &low_threshold, &high_threshold);
+  GST_DEBUG_OBJECT (stream,
+      "Thresholds low:%" GST_TIME_FORMAT " high:%" GST_TIME_FORMAT
+      " recommended:%" GST_TIME_FORMAT, GST_TIME_ARGS (low_threshold),
+      GST_TIME_ARGS (high_threshold),
+      GST_TIME_ARGS (stream->recommended_buffering_threshold));
 
   /* If there are no tracks at all, don't wait. If there are no active
    * tracks, keep filling until at least one track is full. If there
@@ -810,8 +876,9 @@ gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
   for (iter = stream->tracks; iter; iter = iter->next) {
     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
 
-    /* Update the buffering threshold */
-    if (low_threshold != track->buffering_threshold) {
+    /* Update the buffering threshold if it changed by more than a second */
+    if (ABSDIFF (low_threshold, track->buffering_threshold) > GST_SECOND) {
+      GST_DEBUG_OBJECT (stream, "Updating threshold");
       /* The buffering threshold for this track changed, make sure to
        * re-check buffering status */
       update_buffering = TRUE;
@@ -825,10 +892,10 @@ gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
     if (track->level_time < high_threshold) {
       if (track->active) {
         need_to_wait = FALSE;
-        GST_DEBUG_OBJECT (demux,
-            "stream %p track %s has level %" GST_TIME_FORMAT
+        GST_DEBUG_OBJECT (stream,
+            "track %s has level %" GST_TIME_FORMAT
             " - needs more data (target %" GST_TIME_FORMAT
-            ") (fragment duration %" GST_TIME_FORMAT ")", stream,
+            ") (fragment duration %" GST_TIME_FORMAT ")",
             track->stream_id, GST_TIME_ARGS (track->level_time),
             GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
         continue;
@@ -837,34 +904,67 @@ gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
       have_filled_inactive = TRUE;
     }
 
-    GST_DEBUG_OBJECT (demux,
-        "stream %p track %s active (%d) has level %" GST_TIME_FORMAT,
-        stream, track->stream_id, track->active,
-        GST_TIME_ARGS (track->level_time));
+    GST_DEBUG_OBJECT (stream,
+        "track %s active (%d) has level %" GST_TIME_FORMAT,
+        track->stream_id, track->active, GST_TIME_ARGS (track->level_time));
   }
 
   /* If there are no tracks, don't wait (we might need data to create them),
    * or if there are active tracks that need more data to hit the threshold,
    * don't wait. Otherwise it means all active tracks are full and we should wait */
   if (!have_any_tracks) {
-    GST_DEBUG_OBJECT (demux, "stream %p has no tracks - not waiting", stream);
+    GST_DEBUG_OBJECT (stream, "no tracks created yet - not waiting");
     need_to_wait = FALSE;
   } else if (!have_active_tracks && !have_filled_inactive) {
-    GST_DEBUG_OBJECT (demux,
-        "stream %p has inactive tracks that need more data - not waiting",
-        stream);
+    GST_DEBUG_OBJECT (stream,
+        "have only inactive tracks that need more data - not waiting");
     need_to_wait = FALSE;
   }
 
   if (need_to_wait) {
+    stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
+
     for (iter = stream->tracks; iter; iter = iter->next) {
       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
-      track->waiting_del_level = high_threshold;
-      GST_DEBUG_OBJECT (demux,
-          "Waiting for queued data on stream %p track %s to drop below %"
+
+      GST_DEBUG_OBJECT (stream,
+          "Waiting for queued data on track %s to drop below %"
           GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
-          stream, track->stream_id, GST_TIME_ARGS (track->waiting_del_level),
+          track->stream_id, GST_TIME_ARGS (high_threshold),
           GST_TIME_ARGS (fragment_duration));
+
+      /* we want to get woken up when the global output position reaches
+       * a point where the input is closer than "high_threshold" to needing
+       * output, so we can put more data in */
+      GstClockTimeDiff wakeup_time = track->input_time - high_threshold;
+
+      if (stream->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
+          wakeup_time < stream->next_input_wakeup_time) {
+        stream->next_input_wakeup_time = wakeup_time;
+
+        GST_DEBUG_OBJECT (stream,
+            "Track %s level %" GST_TIME_FORMAT ". Input at position %"
+            GST_TIME_FORMAT " next wakeup should be %" GST_TIME_FORMAT " now %"
+            GST_TIME_FORMAT, track->stream_id,
+            GST_TIME_ARGS (track->level_time),
+            GST_TIME_ARGS (track->input_time), GST_TIME_ARGS (wakeup_time),
+            GST_TIME_ARGS (demux->priv->global_output_position));
+      }
+    }
+
+    if (stream->next_input_wakeup_time != GST_CLOCK_TIME_NONE) {
+      GST_DEBUG_OBJECT (stream,
+          "Next input wakeup time is now %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (stream->next_input_wakeup_time));
+
+      /* If this stream needs waking up sooner than any other current one,
+       * update the period wakeup time, which is what the output loop
+       * will check */
+      GstAdaptiveDemuxPeriod *period = stream->period;
+      if (period->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
+          period->next_input_wakeup_time > stream->next_input_wakeup_time) {
+        period->next_input_wakeup_time = stream->next_input_wakeup_time;
+      }
     }
   }
 
@@ -899,9 +999,8 @@ match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
   stream_type = gst_stream_get_stream_type (gst_stream);
 
   GST_DEBUG_OBJECT (pad,
-      "Trying to match pad from parsebin with internal streamid %s and caps %"
-      GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
-      gst_stream_get_caps (gst_stream));
+      "Trying to match pad from parsebin with internal streamid %s and stream %"
+      GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id), gst_stream);
 
   /* Try to match directly by the track's pending upstream_stream_id */
   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
@@ -1043,7 +1142,7 @@ gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
     if (tsdemux_type)
       g_signal_connect (stream->parsebin, "deep-element-added",
           (GCallback) parsebin_deep_element_added_cb, demux);
-    gst_bin_add (GST_BIN_CAST (demux), stream->parsebin);
+    gst_bin_add (GST_BIN_CAST (demux), gst_object_ref (stream->parsebin));
     stream->parsebin_sink =
         gst_element_get_static_pad (stream->parsebin, "sink");
     stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
@@ -1078,6 +1177,12 @@ on_download_error (DownloadRequest * request, DownloadRequestState state,
   guint last_status_code = request->status_code;
   gboolean live;
 
+  if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
+    GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
+        stream->state);
+    return;
+  }
+
   stream->download_active = FALSE;
   stream->last_status_code = last_status_code;
 
@@ -1090,7 +1195,7 @@ on_download_error (DownloadRequest * request, DownloadRequestState state,
           || last_status_code / 100 == 5)) {
     /* 4xx/5xx */
     /* if current position is before available start, switch to next */
-    if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
+    if (!gst_adaptive_demux2_stream_has_next_fragment (stream))
       goto flushing;
 
     if (live) {
@@ -1108,7 +1213,7 @@ on_download_error (DownloadRequest * request, DownloadRequestState state,
 
         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
 
-        ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
+        ret = gst_adaptive_demux2_stream_update_fragment_info (stream);
         GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
             gst_flow_get_name (ret));
 
@@ -1118,8 +1223,7 @@ on_download_error (DownloadRequest * request, DownloadRequestState state,
       } else if (demux->segment.position > range_stop) {
         /* wait a bit to be in range, we don't have any locks at that point */
         GstClockTime wait_time =
-            gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
-            stream);
+            gst_adaptive_demux2_stream_get_fragment_waiting_time (stream);
         if (wait_time > 0) {
           GST_DEBUG_OBJECT (stream,
               "Download waiting for %" GST_TIME_FORMAT,
@@ -1143,7 +1247,7 @@ on_download_error (DownloadRequest * request, DownloadRequestState state,
       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
       return;
     }
-  } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
+  } else if (!gst_adaptive_demux2_stream_has_next_fragment (stream)) {
     /* If this is the last fragment, consider failures EOS and not actual
      * errors. Due to rounding errors in the durations, the last fragment
      * might not actually exist */
@@ -1256,8 +1360,11 @@ on_download_complete (DownloadRequest * request, DownloadRequestState state,
 
   stream->download_active = FALSE;
 
-  if (G_UNLIKELY (stream->cancelled))
+  if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
+    GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
+        stream->state);
     return;
+  }
 
   GST_DEBUG_OBJECT (stream,
       "Stream %p %s download for %s is complete with state %d",
@@ -1331,7 +1438,8 @@ static GstFlowReturn
 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
 {
   GstAdaptiveDemux *demux = stream->demux;
-  GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
   gchar *url = NULL;
 
   /* FIXME :  */
@@ -1450,15 +1558,12 @@ gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
    * We don't need to care about any other events
    */
   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
-    GstAdaptiveDemux *demux = stream->demux;
     GList *iter;
 
-    TRACKS_LOCK (demux);
     for (iter = stream->tracks; iter; iter = iter->next) {
       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
       ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
     }
-    TRACKS_UNLOCK (demux);
   }
 
   gst_event_unref (event);
@@ -1511,6 +1616,19 @@ gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
 
   GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
 
+  if (gst_adaptive_demux_has_next_period (demux)) {
+    if (combined == GST_FLOW_EOS) {
+      GST_DEBUG_OBJECT (stream, "Next period available, advancing");
+      gst_adaptive_demux_advance_period (demux);
+    } else {
+      /* Ensure the 'has_next_period' flag is set on the period before
+       * pushing EOS to the stream, so that the output loop knows not
+       * to actually output the event */
+      GST_DEBUG_OBJECT (stream, "Marking current period has a next one");
+      demux->input_period->has_next_period = TRUE;
+    }
+  }
+
   if (demux->priv->outputs) {
     GstEvent *eos = gst_event_new_eos ();
 
@@ -1523,11 +1641,6 @@ gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
     GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
     gst_adaptive_demux2_stream_error (stream);
   }
-
-  if (combined == GST_FLOW_EOS && gst_adaptive_demux_has_next_period (demux)) {
-    GST_DEBUG_OBJECT (stream, "Next period available, advancing");
-    gst_adaptive_demux_advance_period (demux);
-  }
 }
 
 static gboolean
@@ -1562,6 +1675,26 @@ gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
     return G_SOURCE_REMOVE;
 
+  GstAdaptiveDemux *demux = stream->demux;
+  TRACKS_LOCK (demux);
+
+  GList *iter;
+  for (iter = stream->tracks; iter; iter = iter->next) {
+    GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
+
+    /* We need to recompute the track's level_time value, as the
+     * global output position may have advanced and reduced the
+     * value, even without anything being dequeued yet */
+    gst_adaptive_demux_track_update_level_locked (track);
+
+    GST_DEBUG_OBJECT (stream, "track %s woken level %" GST_TIME_FORMAT
+        " input position %" GST_TIME_FORMAT " at %" GST_TIME_FORMAT,
+        track->stream_id, GST_TIME_ARGS (track->level_time),
+        GST_TIME_ARGS (track->input_time),
+        GST_TIME_ARGS (demux->priv->global_output_position));
+  }
+  TRACKS_UNLOCK (demux);
+
   while (gst_adaptive_demux2_stream_load_a_fragment (stream));
 
   return G_SOURCE_REMOVE;
@@ -1572,12 +1705,8 @@ gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
     stream)
 {
   GstAdaptiveDemux *demux = stream->demux;
-  GList *iter;
 
-  for (iter = stream->tracks; iter; iter = iter->next) {
-    GstAdaptiveDemuxTrack *tmp_track = (GstAdaptiveDemuxTrack *) iter->data;
-    tmp_track->waiting_del_level = 0;
-  }
+  stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
 
   GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
 
@@ -1619,12 +1748,12 @@ gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
       GST_DEBUG_OBJECT (stream,
           "Live playlist EOS - waiting for manifest update");
       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
+      /* Clear the stream last_ret EOS state, since we're not actually EOS */
+      if (stream->last_ret == GST_FLOW_EOS)
+        stream->last_ret = GST_FLOW_OK;
       gst_adaptive_demux2_stream_wants_manifest_update (demux);
       return;
     }
-
-    if (stream->replaced)
-      return;
   }
 
   gst_adaptive_demux2_stream_end_of_manifest (stream);
@@ -1649,7 +1778,7 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
       /* Get information about the fragment to download */
       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
-      ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
+      ret = gst_adaptive_demux2_stream_update_fragment_info (stream);
       GST_DEBUG_OBJECT (stream,
           "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
 
@@ -1662,6 +1791,9 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
       GST_ERROR_OBJECT (stream,
           "Unexpected stream state EOS. The stream should not be running now.");
       return FALSE;
+    case GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED:
+      /* The stream was stopped. Just finish up */
+      return FALSE;
     default:
       GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
       g_assert_not_reached ();
@@ -1681,7 +1813,7 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
     /* wait for live fragments to be available */
     if (live) {
       GstClockTime wait_time =
-          gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
+          gst_adaptive_demux2_stream_get_fragment_waiting_time (stream);
       if (wait_time > 0) {
         GST_DEBUG_OBJECT (stream,
             "Download waiting for %" GST_TIME_FORMAT,
@@ -1706,13 +1838,21 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
     }
   }
 
-  switch (ret) {
+  /* Cast to int avoids a compiler warning that
+   * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
+  switch ((int) ret) {
     case GST_FLOW_OK:
       break;                    /* all is good, let's go */
     case GST_FLOW_EOS:
       GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
+      stream->last_ret = ret;
       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
       return FALSE;
+    case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
+      GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
+      stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
+      gst_adaptive_demux_handle_lost_sync (demux);
+      return FALSE;
     case GST_FLOW_NOT_LINKED:
     {
       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
@@ -1726,9 +1866,8 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
 
     case GST_FLOW_FLUSHING:
       /* Flushing is normal, the target track might have been unselected */
-      if (G_UNLIKELY (stream->cancelled))
-        return FALSE;
-
+      GST_DEBUG_OBJECT (stream, "Got flushing return. Stopping callback.");
+      return FALSE;
     default:
       if (ret <= GST_FLOW_ERROR) {
         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
@@ -1801,7 +1940,7 @@ gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
 
     if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
       /* TODO check return */
-      gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
+      gst_adaptive_demux2_stream_seek (stream, demux->segment.rate >= 0,
           0, stream_time, &stream_time);
       stream->current_position = stream->start_position;
 
@@ -1846,12 +1985,12 @@ gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
 static gboolean
 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
 {
-  GstAdaptiveDemux *demux = stream->demux;
-  GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
 
-  if (!klass->stream_can_start)
+  if (!klass->can_start)
     return TRUE;
-  return klass->stream_can_start (demux, stream);
+  return klass->can_start (stream);
 }
 
 /**
@@ -1892,8 +2031,6 @@ gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
       stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
     GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
         stream->state);
-    stream->cancelled = FALSE;
-    stream->replaced = FALSE;
     stream->last_ret = GST_FLOW_OK;
 
     if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
@@ -1928,6 +2065,8 @@ gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
   stream->downloading_header = stream->downloading_index = FALSE;
   stream->download_request = download_request_new ();
   stream->download_active = FALSE;
+
+  stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
 }
 
 gboolean
@@ -1975,3 +2114,508 @@ gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
 
   return ret;
 }
+
+/* Called from the scheduler task */
+GstClockTime
+gst_adaptive_demux2_stream_get_presentation_offset (GstAdaptiveDemux2Stream *
+    stream)
+{
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+
+  if (klass->get_presentation_offset == NULL)
+    return 0;
+
+  return klass->get_presentation_offset (stream);
+}
+
+GstFlowReturn
+gst_adaptive_demux2_stream_update_fragment_info (GstAdaptiveDemux2Stream *
+    stream)
+{
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+  GstFlowReturn ret;
+
+  g_return_val_if_fail (klass->update_fragment_info != NULL, GST_FLOW_ERROR);
+
+  /* Make sure the sub-class will update bitrate, or else
+   * we will later */
+  stream->fragment.finished = FALSE;
+
+  GST_LOG_OBJECT (stream, "position %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (stream->current_position));
+
+  ret = klass->update_fragment_info (stream);
+
+  GST_LOG_OBJECT (stream, "ret:%s uri:%s",
+      gst_flow_get_name (ret), stream->fragment.uri);
+  if (ret == GST_FLOW_OK) {
+    GST_LOG_OBJECT (stream,
+        "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
+        GST_STIME_ARGS (stream->fragment.stream_time),
+        GST_TIME_ARGS (stream->fragment.duration));
+    GST_LOG_OBJECT (stream,
+        "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
+        stream->fragment.range_start, stream->fragment.range_end);
+  }
+
+  return ret;
+}
+
+static GstFlowReturn
+gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux2Stream *
+    stream, GstBuffer * buffer)
+{
+  return gst_adaptive_demux2_stream_push_buffer (stream, buffer);
+}
+
+static GstFlowReturn
+gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux2Stream *
+    stream)
+{
+  /* No need to advance, this isn't a real fragment */
+  if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
+    return GST_FLOW_OK;
+
+  return gst_adaptive_demux2_stream_advance_fragment (stream,
+      stream->fragment.duration);
+}
+
+/* must be called from the scheduler */
+gboolean
+gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux2Stream * stream)
+{
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+  gboolean ret = TRUE;
+
+  if (klass->has_next_fragment)
+    ret = klass->has_next_fragment (stream);
+
+  return ret;
+}
+
+/* must be called from the scheduler */
+GstFlowReturn
+gst_adaptive_demux2_stream_seek (GstAdaptiveDemux2Stream * stream,
+    gboolean forward, GstSeekFlags flags,
+    GstClockTimeDiff ts, GstClockTimeDiff * final_ts)
+{
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+
+  if (klass->stream_seek)
+    return klass->stream_seek (stream, forward, flags, ts, final_ts);
+  return GST_FLOW_ERROR;
+}
+
+static gboolean
+gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
+    demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate)
+{
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+
+  if (klass->select_bitrate)
+    return klass->select_bitrate (stream, bitrate);
+  return FALSE;
+}
+
+GstClockTime
+gst_adaptive_demux2_stream_get_fragment_waiting_time (GstAdaptiveDemux2Stream *
+    stream)
+{
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+
+  if (klass->get_fragment_waiting_time)
+    return klass->get_fragment_waiting_time (stream);
+  return 0;
+}
+
+/* must be called from the scheduler */
+/* Called from: the ::finish_fragment() handlers when an *actual* fragment is
+ * done
+ *
+ * @duration: Is the duration of the advancement starting from
+ * stream->current_position which might not be the fragment duration after a
+ * seek.
+ */
+GstFlowReturn
+gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux2Stream * stream,
+    GstClockTime duration)
+{
+  if (stream->last_ret != GST_FLOW_OK)
+    return stream->last_ret;
+
+  GstAdaptiveDemux2StreamClass *klass =
+      GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+  GstAdaptiveDemux *demux = stream->demux;
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  g_assert (klass->advance_fragment != NULL);
+
+  GST_LOG_OBJECT (stream,
+      "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
+      GST_STIME_ARGS (stream->fragment.stream_time), GST_TIME_ARGS (duration));
+
+  stream->download_error_count = 0;
+  g_clear_error (&stream->last_error);
+
+#if 0
+  /* FIXME - url has no indication of byte ranges for subsegments */
+  /* FIXME: Reenable statistics sending? */
+  gst_element_post_message (GST_ELEMENT_CAST (demux),
+      gst_message_new_element (GST_OBJECT_CAST (demux),
+          gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
+              "manifest-uri", G_TYPE_STRING,
+              demux->manifest_uri, "uri", G_TYPE_STRING,
+              stream->fragment.uri, "fragment-start-time",
+              GST_TYPE_CLOCK_TIME, stream->download_start_time,
+              "fragment-stop-time", GST_TYPE_CLOCK_TIME,
+              gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
+              stream->download_total_bytes, "fragment-download-time",
+              GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
+#endif
+
+  /* Don't update to the end of the segment if in reverse playback */
+  GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
+  if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
+    stream->parse_segment.position += duration;
+    stream->current_position += duration;
+
+    GST_DEBUG_OBJECT (stream,
+        "stream position now %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (stream->current_position));
+  }
+  GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
+
+  /* When advancing with a non 1.0 rate on live streams, we need to check
+   * the live seeking range again to make sure we can still advance to
+   * that position */
+  if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
+    if (!gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))
+      ret = GST_FLOW_EOS;
+    else
+      ret = klass->advance_fragment (stream);
+  } else if (gst_adaptive_demux_is_live (demux)
+      || gst_adaptive_demux2_stream_has_next_fragment (stream)) {
+    ret = klass->advance_fragment (stream);
+  } else {
+    ret = GST_FLOW_EOS;
+  }
+
+  stream->download_start_time =
+      GST_TIME_AS_USECONDS (gst_adaptive_demux2_get_monotonic_time (demux));
+
+  /* Always check if we need to switch bitrate on OK, or when live
+   * (it's normal to have EOS on advancing in live when we hit the
+   * end of the manifest) */
+  if (ret == GST_FLOW_OK || gst_adaptive_demux_is_live (demux)) {
+    GST_DEBUG_OBJECT (stream, "checking if stream requires bitrate change");
+    if (gst_adaptive_demux2_stream_select_bitrate (demux, stream,
+            gst_adaptive_demux2_stream_update_current_bitrate (stream))) {
+      GST_DEBUG_OBJECT (stream, "Bitrate changed. Returning FLOW_SWITCH");
+      stream->need_header = TRUE;
+      ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
+    }
+  }
+
+  stream->last_ret = ret;
+  return stream->last_ret;
+}
+
+/* TRACKS_LOCK held */
+static GstAdaptiveDemuxTrack *
+gst_adaptive_demux2_stream_find_track_of_type (GstAdaptiveDemux2Stream * stream,
+    GstStreamType stream_type)
+{
+  GList *iter;
+
+  for (iter = stream->tracks; iter; iter = iter->next) {
+    GstAdaptiveDemuxTrack *track = iter->data;
+
+    if (track->type == stream_type)
+      return track;
+  }
+
+  return NULL;
+}
+
+/* TRACKS lock held */
+static void
+gst_adaptive_demux2_stream_update_track_ids (GstAdaptiveDemux2Stream * stream)
+{
+  guint i;
+
+  GST_DEBUG_OBJECT (stream, "Updating track information from collection");
+
+  for (i = 0; i < gst_stream_collection_get_size (stream->stream_collection);
+      i++) {
+    GstStream *gst_stream =
+        gst_stream_collection_get_stream (stream->stream_collection, i);
+    GstStreamType stream_type = gst_stream_get_stream_type (gst_stream);
+    GstAdaptiveDemuxTrack *track;
+
+    if (stream_type == GST_STREAM_TYPE_UNKNOWN)
+      continue;
+    track = gst_adaptive_demux2_stream_find_track_of_type (stream, stream_type);
+    if (!track) {
+      GST_DEBUG_OBJECT (stream,
+          "We don't have an existing track to handle stream %" GST_PTR_FORMAT,
+          gst_stream);
+      continue;
+    }
+
+    if (track->upstream_stream_id)
+      g_free (track->upstream_stream_id);
+    track->upstream_stream_id =
+        g_strdup (gst_stream_get_stream_id (gst_stream));
+  }
+
+}
+
+static gboolean
+tags_have_language_info (GstTagList * tags)
+{
+  const gchar *language = NULL;
+
+  if (tags == NULL)
+    return FALSE;
+
+  if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_CODE, 0,
+          &language))
+    return TRUE;
+  if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_NAME, 0,
+          &language))
+    return TRUE;
+
+  return FALSE;
+}
+
+static gboolean
+can_handle_collection (GstAdaptiveDemux2Stream * stream,
+    GstStreamCollection * collection)
+{
+  guint i;
+  guint nb_audio, nb_video, nb_text;
+  gboolean have_audio_languages = TRUE;
+  gboolean have_text_languages = TRUE;
+
+  nb_audio = nb_video = nb_text = 0;
+
+  for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
+    GstStream *gst_stream = gst_stream_collection_get_stream (collection, i);
+    GstTagList *tags = gst_stream_get_tags (gst_stream);
+
+    GST_DEBUG_OBJECT (stream,
+        "Internal collection stream #%d %" GST_PTR_FORMAT, i, gst_stream);
+    switch (gst_stream_get_stream_type (gst_stream)) {
+      case GST_STREAM_TYPE_AUDIO:
+        have_audio_languages &= tags_have_language_info (tags);
+        nb_audio++;
+        break;
+      case GST_STREAM_TYPE_VIDEO:
+        nb_video++;
+        break;
+      case GST_STREAM_TYPE_TEXT:
+        have_text_languages &= tags_have_language_info (tags);
+        nb_text++;
+        break;
+      default:
+        break;
+    }
+    if (tags)
+      gst_tag_list_unref (tags);
+  }
+
+  /* Check that we either have at most 1 of each track type, or that
+   * we have language tags for each to tell which is which */
+  if (nb_video > 1 ||
+      (nb_audio > 1 && !have_audio_languages) ||
+      (nb_text > 1 && !have_text_languages)) {
+    GST_WARNING
+        ("Collection can't be handled (nb_audio:%d, nb_video:%d, nb_text:%d)",
+        nb_audio, nb_video, nb_text);
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
+/* Called from the demuxer when it receives a GstStreamCollection on the bus
+ * for this stream. */
+/* TRACKS lock held */
+gboolean
+gst_adaptive_demux2_stream_handle_collection (GstAdaptiveDemux2Stream * stream,
+    GstStreamCollection * collection, gboolean * had_pending_tracks)
+{
+  g_assert (had_pending_tracks != NULL);
+
+  /* Check whether the collection is "sane" or not.
+   *
+   * In the context of adaptive streaming, we can only handle multiplexed
+   * content where the output sub-streams can be matched reliably to the various
+   * tracks. That is, only a single stream of each type, or if there are
+   * multiple audio/subtitle tracks, they can be differentiated by language
+   * (and possibly in the future by codec).
+   */
+  if (!can_handle_collection (stream, collection)) {
+    return FALSE;
+  }
+
+  /* Store the collection on the stream */
+  gst_object_replace ((GstObject **) & stream->stream_collection,
+      (GstObject *) collection);
+
+  /* If stream is marked as having pending_tracks, ask the subclass to
+   * handle that and create the tracks now */
+  if (stream->pending_tracks) {
+    GstAdaptiveDemux2StreamClass *klass =
+        GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+    g_assert (klass->create_tracks);
+    klass->create_tracks (stream);
+    stream->pending_tracks = FALSE;
+    *had_pending_tracks = TRUE;
+  } else {
+    g_assert (stream->tracks);
+
+    /* Now we should have assigned tracks, match them to the
+     * collection and update the pending upstream stream_id
+     * for each of them based on the collection information. */
+    gst_adaptive_demux2_stream_update_track_ids (stream);
+  }
+
+  return TRUE;
+}
+
+static guint64
+_update_average_bitrate (GstAdaptiveDemux2Stream * stream, guint64 new_bitrate)
+{
+  gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
+
+  stream->moving_bitrate -= stream->fragment_bitrates[index];
+  stream->fragment_bitrates[index] = new_bitrate;
+  stream->moving_bitrate += new_bitrate;
+
+  stream->moving_index += 1;
+
+  if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
+    return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
+  return stream->moving_bitrate / stream->moving_index;
+}
+
+guint64
+gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux2Stream *
+    stream)
+{
+  guint64 average_bitrate;
+  guint64 fragment_bitrate;
+  guint connection_speed, min_bitrate, max_bitrate, target_download_rate;
+
+  fragment_bitrate = stream->last_bitrate;
+  GST_DEBUG_OBJECT (stream, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
+      fragment_bitrate);
+
+  average_bitrate = _update_average_bitrate (stream, fragment_bitrate);
+
+  GST_INFO_OBJECT (stream,
+      "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
+  GST_INFO_OBJECT (stream,
+      "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
+      NUM_LOOKBACK_FRAGMENTS, average_bitrate);
+
+  /* Conservative approach, make sure we don't upgrade too fast */
+  stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
+
+  /* For the video stream, update the demuxer reported download
+   * rate. FIXME: Move all bandwidth estimation to the
+   * download helper and make it the demuxer's responsibility
+   * to select the right set of things to download within
+   * that bandwidth */
+  GstAdaptiveDemux *demux = stream->demux;
+  GST_OBJECT_LOCK (demux);
+
+  /* If this is stream containing our video, update the overall demuxer
+   * reported bitrate and notify, to give the application a
+   * chance to choose a new connection-bitrate */
+  if ((stream->stream_type & GST_STREAM_TYPE_VIDEO) != 0) {
+    demux->current_download_rate = stream->current_download_rate;
+    GST_OBJECT_UNLOCK (demux);
+    g_object_notify (G_OBJECT (demux), "current-bandwidth");
+    GST_OBJECT_LOCK (demux);
+  }
+
+  connection_speed = demux->connection_speed;
+  min_bitrate = demux->min_bitrate;
+  max_bitrate = demux->max_bitrate;
+  GST_OBJECT_UNLOCK (demux);
+
+  if (connection_speed) {
+    GST_LOG_OBJECT (stream, "connection-speed is set to %u kbps, using it",
+        connection_speed / 1000);
+    return connection_speed;
+  }
+
+  /* No explicit connection_speed, so choose the new variant to use as a
+   * fraction of the measured download rate */
+  target_download_rate =
+      CLAMP (stream->current_download_rate, 0,
+      G_MAXUINT) * demux->bandwidth_target_ratio;
+
+  GST_DEBUG_OBJECT (stream, "Bitrate after target ratio limit (%0.2f): %u",
+      demux->bandwidth_target_ratio, target_download_rate);
+
+#if 0
+  /* Debugging code, modulate the bitrate every few fragments */
+  {
+    static guint ctr = 0;
+    if (ctr % 3 == 0) {
+      GST_INFO_OBJECT (stream, "Halving reported bitrate for debugging");
+      target_download_rate /= 2;
+    }
+    ctr++;
+  }
+#endif
+
+  if (min_bitrate > 0 && target_download_rate < min_bitrate) {
+    target_download_rate = min_bitrate;
+    GST_LOG_OBJECT (stream, "Bitrate adjusted due to min-bitrate : %u bits/s",
+        min_bitrate);
+  }
+
+  if (max_bitrate > 0 && target_download_rate > max_bitrate) {
+    target_download_rate = max_bitrate;
+    GST_LOG_OBJECT (stream, "Bitrate adjusted due to max-bitrate : %u bits/s",
+        max_bitrate);
+  }
+
+  GST_DEBUG_OBJECT (stream, "Returning target download rate of %u bps",
+      target_download_rate);
+
+  return target_download_rate;
+}
+
+void
+gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f)
+{
+  g_free (f->uri);
+  f->uri = NULL;
+  f->range_start = 0;
+  f->range_end = -1;
+
+  g_free (f->header_uri);
+  f->header_uri = NULL;
+  f->header_range_start = 0;
+  f->header_range_end = -1;
+
+  g_free (f->index_uri);
+  f->index_uri = NULL;
+  f->index_range_start = 0;
+  f->index_range_end = -1;
+
+  f->stream_time = GST_CLOCK_STIME_NONE;
+  f->duration = GST_CLOCK_TIME_NONE;
+  f->finished = FALSE;
+}