dashdemux: download the next fragment with smaller timestamp
authorThiago Santos <thiago.sousa.santos@collabora.com>
Fri, 1 Feb 2013 05:10:15 +0000 (02:10 -0300)
committerThiago Santos <thiago.sousa.santos@collabora.com>
Wed, 8 May 2013 21:14:39 +0000 (18:14 -0300)
Instead of downloading 1 fragment per stream per download loop,
select the stream with the earlier timestamp and get a fragment
only for that one.

The old algorithm would lead to problems when the fragment durations
were too different for streams.

ext/dash/gstdashdemux.c
ext/dash/gstdashdemux.h
ext/dash/gstmpdparser.c
ext/dash/gstmpdparser.h

index f252ca4..ea3ecbe 100644 (file)
@@ -233,7 +233,7 @@ static void gst_dash_demux_resume_download_task (GstDashDemux * demux);
 static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
 static gboolean gst_dash_demux_select_representations (GstDashDemux * demux,
     guint64 current_bitrate);
-static gboolean gst_dash_demux_get_next_fragment_set (GstDashDemux * demux);
+static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux);
 
 static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
 static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
@@ -1269,6 +1269,21 @@ gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
   return (GstClockTime) level.time;
 }
 
+static gboolean
+gst_dash_demux_all_streams_have_data (GstDashDemux * demux)
+{
+  GSList *iter;
+
+  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+    GstDashDemuxStream *stream = iter->data;
+
+    if (!stream->has_data_queued)
+      return FALSE;
+  }
+
+  return TRUE;
+}
+
 /* gst_dash_demux_download_loop:
  * 
  * Loop for the "download' task that fetches fragments based on the 
@@ -1384,11 +1399,13 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
   GST_DEBUG_OBJECT (demux, "download loop %i", demux->end_of_manifest);
 
   /* try to switch to another set of representations if needed */
-  gst_dash_demux_select_representations (demux,
-      demux->bandwidth_usage * demux->dnl_rate);
+  if (gst_dash_demux_all_streams_have_data (demux)) {
+    gst_dash_demux_select_representations (demux,
+        demux->bandwidth_usage * demux->dnl_rate);
+  }
 
   /* fetch the next fragment */
-  while (!gst_dash_demux_get_next_fragment_set (demux)) {
+  while (!gst_dash_demux_get_next_fragment (demux)) {
     if (demux->end_of_period) {
       GST_INFO_OBJECT (demux, "Reached the end of the Period");
       /* setup video, audio and subtitle streams, starting from the next Period */
@@ -1482,6 +1499,7 @@ gst_dash_demux_prepare_pad_switch (GstDashDemux * demux)
     g_assert (caps != NULL);
     gst_dash_demux_stream_push_event (stream,
         gst_event_new_dash_event_pad_switch (caps));
+    stream->has_data_queued = FALSE;
   }
 }
 
@@ -1672,9 +1690,9 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream)
   }
 }
 
-/* gst_dash_demux_get_next_fragment_set:
+/* gst_dash_demux_get_next_fragment:
  *
- * Get the next set of fragments for the current representations.
+ * Get the next fragments for the stream with the earlier timestamp.
  * 
  * This function uses the generic URI downloader API.
  *
@@ -1682,11 +1700,10 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream)
  * 
  */
 static gboolean
-gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
+gst_dash_demux_get_next_fragment (GstDashDemux * demux)
 {
   GstActiveStream *active_stream;
   GstFragment *download, *header;
-  GList *fragment_set;
   gchar *next_fragment_uri;
   GstClockTime duration;
   GstClockTime timestamp;
@@ -1697,25 +1714,28 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
   guint64 size_buffer = 0;
   GSList *iter;
   gboolean end_of_period = TRUE;
+  GstDashDemuxStream *selected_stream = NULL;
+  GstClockTime best_time = GST_CLOCK_TIME_NONE;
 
-  g_get_current_time (&start);
-  fragment_set = NULL;
-  /* Get the fragment corresponding to each stream index */
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
-    guint stream_idx = stream->index;
-    GstBuffer *buffer;
+    GstClockTime ts;
 
     if (stream->download_end_of_period)
       continue;
 
-    if (!gst_mpd_client_get_next_fragment (demux->client,
-            stream_idx, &discont, &next_fragment_uri, &duration, &timestamp)) {
+    if (gst_mpd_client_get_next_fragment_timestamp (demux->client,
+            stream->index, &ts)) {
+      if (ts < best_time || !GST_CLOCK_TIME_IS_VALID (best_time)) {
+        selected_stream = stream;
+        best_time = ts;
+      }
+    } else {
       GstEvent *event = NULL;
 
       GST_INFO_OBJECT (demux,
           "This Period doesn't contain more fragments for stream %u",
-          stream_idx);
+          stream->index);
       if (gst_mpd_client_has_next_period (demux->client)) {
         event = gst_event_new_dash_eop ();
       } else {
@@ -1723,56 +1743,71 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
       }
       stream->download_end_of_period = TRUE;
       gst_dash_demux_stream_push_event (stream, event);
-      continue;
     }
+  }
 
-    GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
-    GST_INFO_OBJECT (demux,
-        "Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
-        GST_TIME_FORMAT, next_fragment_uri, GST_TIME_ARGS (timestamp),
-        GST_TIME_ARGS (duration));
+  g_get_current_time (&start);
+  /* Get the fragment corresponding to each stream index */
+  if (selected_stream) {
+    guint stream_idx = selected_stream->index;
+    GstBuffer *buffer;
 
-    /* got a fragment to fetch, no end of period */
-    end_of_period = FALSE;
+    if (gst_mpd_client_get_next_fragment (demux->client,
+            stream_idx, &discont, &next_fragment_uri, &duration, &timestamp)) {
 
-    download = gst_uri_downloader_fetch_uri (demux->downloader,
-        next_fragment_uri);
-    g_free (next_fragment_uri);
+      GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
+      GST_INFO_OBJECT (demux,
+          "Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
+          GST_TIME_FORMAT, next_fragment_uri, GST_TIME_ARGS (timestamp),
+          GST_TIME_ARGS (duration));
 
-    if (download == NULL)
-      return FALSE;
+      /* got a fragment to fetch, no end of period */
+      end_of_period = FALSE;
 
-    buffer = gst_fragment_get_buffer (download);
+      download = gst_uri_downloader_fetch_uri (demux->downloader,
+          next_fragment_uri);
+      g_free (next_fragment_uri);
 
-    active_stream =
-        gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
-    if (stream == NULL)         /* TODO unref fragments */
-      return FALSE;
+      if (download == NULL)
+        return FALSE;
 
-    if (stream->need_header) {
-      /* We need to fetch a new header */
-      if ((header = gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
-        GST_INFO_OBJECT (demux, "Unable to fetch header");
-      } else {
-        GstBuffer *header_buffer;
-        /* Replace fragment with a new one including the header */
+      buffer = gst_fragment_get_buffer (download);
+
+      active_stream =
+          gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
+      if (active_stream == NULL)        /* TODO unref fragments */
+        return FALSE;
+
+      if (selected_stream->need_header) {
+        /* We need to fetch a new header */
+        if ((header =
+                gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
+          GST_INFO_OBJECT (demux, "Unable to fetch header");
+        } else {
+          GstBuffer *header_buffer;
+          /* Replace fragment with a new one including the header */
 
-        header_buffer = gst_fragment_get_buffer (header);
-        buffer = gst_buffer_join (header_buffer, buffer);
+          header_buffer = gst_fragment_get_buffer (header);
+          buffer = gst_buffer_join (header_buffer, buffer);
+        }
+        selected_stream->need_header = FALSE;
       }
-      stream->need_header = FALSE;
-    }
 
-    buffer = gst_buffer_make_metadata_writable (buffer);
+      buffer = gst_buffer_make_metadata_writable (buffer);
 
-    GST_BUFFER_TIMESTAMP (buffer) = timestamp;
-    GST_BUFFER_DURATION (buffer) = duration;
-    GST_BUFFER_OFFSET (buffer) =
-        gst_mpd_client_get_segment_index (active_stream) - 1;
+      GST_BUFFER_TIMESTAMP (buffer) = timestamp;
+      GST_BUFFER_DURATION (buffer) = duration;
+      GST_BUFFER_OFFSET (buffer) =
+          gst_mpd_client_get_segment_index (active_stream) - 1;
 
-    gst_buffer_set_caps (buffer, stream->input_caps);
-    gst_dash_demux_stream_push_data (stream, buffer);
-    size_buffer += GST_BUFFER_SIZE (buffer);
+      gst_buffer_set_caps (buffer, selected_stream->input_caps);
+      gst_dash_demux_stream_push_data (selected_stream, buffer);
+      selected_stream->has_data_queued = TRUE;
+      size_buffer += GST_BUFFER_SIZE (buffer);
+    } else {
+      GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d",
+          selected_stream, selected_stream->index);
+    }
   }
 
   demux->end_of_period = end_of_period;
index ab2acc1..1157d57 100644 (file)
@@ -78,11 +78,20 @@ struct _GstDashDemuxStream
   gboolean stream_end_of_period;
 
   gboolean stream_eos;
-
   gboolean need_header;
-
   gboolean need_segment;
 
+  /* tracks if a stream has enqueued data
+   * after a pad switch.
+   * This is required to prevent pads being
+   * added to the demuxer and having no data
+   * pushed to it before another pad switch
+   * as this might make downstream elements
+   * unhappy and error out if they get
+   * an EOS without receiving any input
+   */
+  gboolean has_data_queued;
+
   GstDataQueue *queue;
 };
 
index 3523a24..55b3b92 100644 (file)
@@ -3130,6 +3130,35 @@ gst_mpd_client_setup_streaming (GstMpdClient * client,
 }
 
 gboolean
+gst_mpd_client_get_next_fragment_timestamp (GstMpdClient * client,
+    guint stream_idx, GstClockTime * ts)
+{
+  GstActiveStream *stream;
+  gint segment_idx;
+  GstMediaSegment *currentChunk;
+
+  GST_DEBUG ("Stream index: %i", stream_idx);
+  stream = g_list_nth_data (client->active_streams, stream_idx);
+  g_return_val_if_fail (stream != NULL, 0);
+
+  GST_MPD_CLIENT_LOCK (client);
+  segment_idx = gst_mpd_client_get_segment_index (stream);
+  GST_DEBUG ("Looking for fragment sequence chunk %d", segment_idx);
+
+  currentChunk =
+      gst_mpdparser_get_chunk_by_index (client, stream_idx, segment_idx);
+  if (currentChunk == NULL) {
+    GST_MPD_CLIENT_UNLOCK (client);
+    return FALSE;
+  }
+
+  *ts = currentChunk->start_time;
+  GST_MPD_CLIENT_UNLOCK (client);
+
+  return TRUE;
+}
+
+gboolean
 gst_mpd_client_get_next_fragment (GstMpdClient * client,
     guint indexStream, gboolean * discontinuity, gchar ** uri,
     GstClockTime * duration, GstClockTime * timestamp)
index 0624043..16ad7e0 100644 (file)
@@ -470,6 +470,7 @@ gboolean gst_mpd_client_setup_representation (GstMpdClient *client, GstActiveStr
 GstClockTime gst_mpd_client_get_current_position (GstMpdClient *client);
 GstClockTime gst_mpd_client_get_next_fragment_duration (GstMpdClient * client);
 GstClockTime gst_mpd_client_get_media_presentation_duration (GstMpdClient *client);
+gboolean gst_mpd_client_get_next_fragment_timestamp (GstMpdClient * client, guint stream_idx, GstClockTime * ts);
 gboolean gst_mpd_client_get_next_fragment (GstMpdClient *client, guint indexStream, gboolean *discontinuity, gchar **uri, GstClockTime *duration, GstClockTime *timestamp);
 gboolean gst_mpd_client_get_next_header (GstMpdClient *client, const gchar **uri, guint stream_idx);
 gboolean gst_mpd_client_is_live (GstMpdClient * client);