dashdemux: keep a list of streams periods
authorAndre Moreira Magalhaes (andrunko) <andre.magalhaes@collabora.co.uk>
Wed, 13 Feb 2013 04:13:23 +0000 (02:13 -0200)
committerThiago Santos <thiago.sousa.santos@collabora.com>
Wed, 8 May 2013 21:14:41 +0000 (18:14 -0300)
Keep a list of streams per period so that the download loop can keep
downloading while the stream loop is still pushing old period's data.

ext/dash/gstdashdemux.c
ext/dash/gstdashdemux.h

index 3b04040..2cea002 100644 (file)
@@ -216,14 +216,19 @@ static void gst_dash_demux_resume_download_task (GstDashDemux * demux);
 static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
 static gboolean gst_dash_demux_select_representations (GstDashDemux * demux);
 static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux);
+static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
 
+static void gst_dash_demux_expose_streams (GstDashDemux * demux);
+static void gst_dash_demux_remove_streams (GstDashDemux * demux,
+    GSList * streams);
+static void gst_dash_demux_stream_free (GstDashDemuxStream * stream);
 static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
 static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
 static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
     GstActiveStream * stream);
 static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream
     * stream);
-static void gst_dash_demux_create_pads (GstDashDemux * demux);
+static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux);
 
 static void
 _do_init (GType type)
@@ -262,13 +267,13 @@ gst_dash_demux_dispose (GObject * obj)
 
   if (demux->stream_task) {
     gst_object_unref (demux->stream_task);
-    g_static_rec_mutex_free (&demux->stream_lock);
+    g_static_rec_mutex_free (&demux->stream_task_lock);
     demux->stream_task = NULL;
   }
 
   if (demux->download_task) {
     gst_object_unref (demux->download_task);
-    g_static_rec_mutex_free (&demux->download_lock);
+    g_static_rec_mutex_free (&demux->download_task_lock);
     demux->download_task = NULL;
   }
 
@@ -338,18 +343,20 @@ gst_dash_demux_init (GstDashDemux * demux, GstDashDemuxClass * klass)
   demux->max_bitrate = DEFAULT_MAX_BITRATE;
 
   /* Updates task */
-  g_static_rec_mutex_init (&demux->download_lock);
+  g_static_rec_mutex_init (&demux->download_task_lock);
   demux->download_task =
       gst_task_create ((GstTaskFunction) gst_dash_demux_download_loop, demux);
-  gst_task_set_lock (demux->download_task, &demux->download_lock);
+  gst_task_set_lock (demux->download_task, &demux->download_task_lock);
   demux->download_timed_lock = g_mutex_new ();
 
   /* Streaming task */
-  g_static_rec_mutex_init (&demux->stream_lock);
+  g_static_rec_mutex_init (&demux->stream_task_lock);
   demux->stream_task =
       gst_task_create ((GstTaskFunction) gst_dash_demux_stream_loop, demux);
-  gst_task_set_lock (demux->stream_task, &demux->stream_lock);
+  gst_task_set_lock (demux->stream_task, &demux->stream_task_lock);
   demux->stream_timed_lock = g_mutex_new ();
+
+  g_static_mutex_init (&demux->streams_lock);
 }
 
 static void
@@ -528,9 +535,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
         gst_dash_demux_stop (demux);
 
         /* Wait for streaming to finish */
-        g_static_rec_mutex_lock (&demux->stream_lock);
-
-        //GST_MPD_CLIENT_LOCK (demux->client);
+        g_static_rec_mutex_lock (&demux->stream_task_lock);
 
         /* select the requested Period in the Media Presentation */
         target_pos = (GstClockTime) demux->segment.start;
@@ -550,11 +555,22 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
           return FALSE;
         }
         if (current_period != gst_mpd_client_get_period_index (demux->client)) {
+          GSList *streams = NULL;
+
           GST_DEBUG_OBJECT (demux, "Seeking to Period %d", current_period);
+          streams = demux->streams;
+          demux->streams = NULL;
+          /* clean old active stream list, if any */
+          gst_active_streams_free (demux->client);
+
           /* setup video, audio and subtitle streams, starting from the new Period */
           if (!gst_mpd_client_set_period_index (demux->client, current_period)
               || !gst_dash_demux_setup_all_streams (demux))
             return FALSE;
+
+          gst_dash_demux_expose_streams (demux);
+
+          gst_dash_demux_remove_streams (demux, streams);
         }
 
         if (list == NULL) {
@@ -614,7 +630,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
         gst_uri_downloader_reset (demux->downloader);
         gst_dash_demux_resume_download_task (demux);
         gst_dash_demux_resume_stream_task (demux);
-        g_static_rec_mutex_unlock (&demux->stream_lock);
+        g_static_rec_mutex_unlock (&demux->stream_task_lock);
       }
 
       return TRUE;
@@ -632,6 +648,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
   GList *listLang = NULL;
   guint i, nb_audio;
   gchar *lang;
+  GSList *streams = NULL;
 
   GST_MPD_CLIENT_LOCK (demux->client);
   /* clean old active stream list, if any */
@@ -645,7 +662,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
       &listLang);
   if (nb_audio == 0)
     nb_audio = 1;
-  GST_INFO_OBJECT (demux, "Number of language is=%d", nb_audio);
+  GST_INFO_OBJECT (demux, "Number of languages is=%d", nb_audio);
 
   for (i = 0; i < nb_audio; i++) {
     lang = (gchar *) g_list_nth_data (listLang, i);
@@ -686,12 +703,12 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
         DOWNLOAD_RATE_HISTORY_MAX);
 
     GST_LOG_OBJECT (demux, "Creating stream %d %" GST_PTR_FORMAT, i, caps);
-    demux->streams = g_slist_prepend (demux->streams, stream);
+    streams = g_slist_prepend (streams, stream);
+    stream->pad = gst_dash_demux_create_pad (demux);
   }
-  demux->streams = g_slist_reverse (demux->streams);
-
-  gst_dash_demux_create_pads (demux);
+  streams = g_slist_reverse (streams);
 
+  demux->next_periods = g_slist_append (demux->next_periods, streams);
   GST_MPD_CLIENT_UNLOCK (demux->client);
 
   return TRUE;
@@ -759,6 +776,8 @@ gst_dash_demux_sink_event (GstPad * pad, GstEvent * event)
           !gst_dash_demux_setup_all_streams (demux))
         return FALSE;
 
+      gst_dash_demux_advance_period (demux);
+
       /* start playing from the first segment */
       gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0);
 
@@ -845,6 +864,7 @@ gst_dash_demux_src_query (GstPad * pad, GstQuery * query)
         gst_query_set_seeking (query, fmt,
             !gst_mpd_client_is_live (dashdemux->client), 0, stop);
         ret = TRUE;
+
         GST_DEBUG_OBJECT (dashdemux, "GST_QUERY_SEEKING returning with stop : %"
             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
       }
@@ -891,15 +911,15 @@ gst_dash_demux_stop (GstDashDemux * demux)
   if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
     GST_TASK_SIGNAL (demux->download_task);
     gst_task_stop (demux->download_task);
-    g_static_rec_mutex_lock (&demux->download_lock);
-    g_static_rec_mutex_unlock (&demux->download_lock);
+    g_static_rec_mutex_lock (&demux->download_task_lock);
+    g_static_rec_mutex_unlock (&demux->download_task_lock);
     gst_task_join (demux->download_task);
   }
   if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
     GST_TASK_SIGNAL (demux->stream_task);
     gst_task_stop (demux->stream_task);
-    g_static_rec_mutex_lock (&demux->stream_lock);
-    g_static_rec_mutex_unlock (&demux->stream_lock);
+    g_static_rec_mutex_lock (&demux->stream_task_lock);
+    g_static_rec_mutex_unlock (&demux->stream_task_lock);
     gst_task_join (demux->stream_task);
   }
 
@@ -911,32 +931,89 @@ gst_dash_demux_stop (GstDashDemux * demux)
   }
 }
 
+static GstPad *
+gst_dash_demux_create_pad (GstDashDemux * demux)
+{
+  GstPad *pad;
+
+  /* Create and activate new pads */
+  pad = gst_pad_new_from_static_template (&srctemplate, NULL);
+  gst_pad_set_event_function (pad,
+      GST_DEBUG_FUNCPTR (gst_dash_demux_src_event));
+  gst_pad_set_query_function (pad,
+      GST_DEBUG_FUNCPTR (gst_dash_demux_src_query));
+  gst_pad_set_element_private (pad, demux);
+  gst_pad_set_active (pad, TRUE);
+  GST_INFO_OBJECT (demux, "Creating srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
+  return pad;
+}
+
 static void
-gst_dash_demux_create_pads (GstDashDemux * demux)
+gst_dash_demux_expose_streams (GstDashDemux * demux)
 {
   GSList *iter;
 
-  /* Create and activate new pads */
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
 
+    GST_LOG_OBJECT (demux, "Exposing stream %d %" GST_PTR_FORMAT, stream->index,
+        stream->input_caps);
     g_assert (stream->pad == NULL);
-
-    stream->pad = gst_pad_new_from_static_template (&srctemplate, NULL);
-    gst_pad_set_event_function (stream->pad,
-        GST_DEBUG_FUNCPTR (gst_dash_demux_src_event));
-    gst_pad_set_query_function (stream->pad,
-        GST_DEBUG_FUNCPTR (gst_dash_demux_src_query));
-    gst_pad_set_element_private (stream->pad, demux);
-    gst_pad_set_active (stream->pad, TRUE);
-    GST_INFO_OBJECT (demux, "Adding srcpad %s:%s",
-        GST_DEBUG_PAD_NAME (stream->pad));
     gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad));
   }
-
   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
 }
 
+static void
+gst_dash_demux_remove_streams (GstDashDemux * demux, GSList * streams)
+{
+  GSList *iter;
+  GstEvent *eos = gst_event_new_eos ();
+
+  for (iter = streams; iter; iter = g_slist_next (iter)) {
+    GstDashDemuxStream *stream = iter->data;;
+
+    GST_LOG_OBJECT (demux, "Removing stream %d %" GST_PTR_FORMAT, stream->index,
+        stream->input_caps);
+    g_assert (stream->pad == NULL);
+    gst_pad_push_event (stream->pad, gst_event_ref (eos));
+    gst_pad_set_active (stream->pad, FALSE);
+    gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
+    gst_dash_demux_stream_free (stream);
+  }
+  gst_event_unref (eos);
+  g_slist_free (streams);
+}
+
+static gboolean
+gst_dash_demux_advance_period (GstDashDemux * demux)
+{
+  GSList *old_period = NULL;
+  g_static_mutex_lock (&demux->streams_lock);
+
+  if (demux->streams) {
+    g_assert (demux->streams == demux->next_periods->data);
+
+    demux->next_periods = g_slist_remove (demux->next_periods, demux->streams);
+    old_period = demux->streams;
+    demux->streams = NULL;
+  }
+
+  if (demux->next_periods) {
+    demux->streams = demux->next_periods->data;
+  } else {
+    GST_DEBUG_OBJECT (demux, "No next periods, return FALSE");
+    g_static_mutex_unlock (&demux->streams_lock);
+    return FALSE;
+  }
+
+  gst_dash_demux_expose_streams (demux);
+  gst_dash_demux_remove_streams (demux, old_period);
+
+  g_static_mutex_unlock (&demux->streams_lock);
+  return TRUE;
+}
+
 /* gst_dash_demux_stream_loop:
  * 
  * Loop for the "stream' task that pushes fragments to the src pads.
@@ -962,7 +1039,6 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
 {
   GstFlowReturn ret;
   GstActiveStream *active_stream;
-  guint i = 0;
   GSList *iter;
   GstClockTime best_time;
   GstDashDemuxStream *selected_stream;
@@ -973,7 +1049,8 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
 
   best_time = GST_CLOCK_TIME_NONE;
   selected_stream = NULL;
-  for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) {
+
+  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
     GstDataQueueItem *item;
 
@@ -1030,8 +1107,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
           selected_stream->index);
       if (demux->need_segment) {
         /* And send a newsegment */
-        for (iter = demux->streams, i = 0; iter;
-            i++, iter = g_slist_next (iter)) {
+        for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
           GstDashDemuxStream *stream = iter->data;
           gst_pad_push_event (stream->pad,
               gst_event_new_new_segment (FALSE, demux->segment.rate,
@@ -1043,9 +1119,10 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
 
       GST_DEBUG_OBJECT (demux,
           "Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%"
-          GST_TIME_FORMAT, buffer, GST_BUFFER_OFFSET (buffer),
+          GST_TIME_FORMAT " at pad: %s:%s", buffer, GST_BUFFER_OFFSET (buffer),
           selected_stream->index, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
-          GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+          GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
+          GST_DEBUG_PAD_NAME (selected_stream->pad));
       ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer));
       gst_segment_set_last_stop (&demux->segment, GST_FORMAT_TIME,
           GST_BUFFER_TIMESTAMP (buffer));
@@ -1072,7 +1149,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
     if (eos) {
       goto end_of_manifest;
     } else if (eop) {
-      /* TODO advance to next period */
+      gst_dash_demux_advance_period (demux);
     }
   }
 
@@ -1294,7 +1371,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
           gst_buffer_unref (buffer);
           stream = gst_mpdparser_get_active_stream_by_index (demux->client, 0);
           segment_index = gst_mpd_client_get_segment_index (stream);
-          /* setup video, audio and subtitle streams, starting from first Period */
+          /* setup video, audio and subtitle streams, starting from current Period */
           if (!gst_mpd_client_setup_media_presentation (demux->client) ||
               !gst_mpd_client_set_period_index (demux->client,
                   gst_mpd_client_get_period_index (demux->client))
@@ -1303,7 +1380,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
                 "Error setting up the updated manifest file");
             goto end_of_manifest;
           }
-          /* continue playing from the the next segment */
+          /* continue playing from the next segment */
           /* FIXME: support multiple streams with different segment duration */
           gst_mpd_client_set_segment_index_for_all_streams (demux->client,
               segment_index);
@@ -1635,8 +1712,15 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
   gboolean end_of_period = TRUE;
   GstDashDemuxStream *selected_stream = NULL;
   GstClockTime best_time = GST_CLOCK_TIME_NONE;
+  GSList *streams;
 
-  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+  g_static_mutex_lock (&demux->streams_lock);
+  /* TODO add check */
+  streams = g_slist_last (demux->next_periods)->data;
+
+  g_static_mutex_unlock (&demux->streams_lock);
+
+  for (iter = streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
     GstClockTime ts;
 
@@ -1658,6 +1742,8 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
       if (gst_mpd_client_has_next_period (demux->client)) {
         event = gst_event_new_dash_eop ();
       } else {
+        GST_DEBUG_OBJECT (demux,
+            "No more fragments or periods for this stream, setting EOS");
         event = gst_event_new_eos ();
       }
       stream->download_end_of_period = TRUE;
index 2c8257e..fb2c790 100644 (file)
@@ -107,6 +107,8 @@ struct _GstDashDemux
   GstPad *sinkpad;
 
   GSList *streams;
+  GSList *next_periods;
+  GStaticMutex streams_lock;
 
   GstSegment segment;
   gboolean need_segment;
@@ -124,12 +126,12 @@ struct _GstDashDemux
 
   /* Streaming task */
   GstTask *stream_task;
-  GStaticRecMutex stream_lock;
+  GStaticRecMutex stream_task_lock;
   GMutex *stream_timed_lock;
 
   /* Download task */
   GstTask *download_task;
-  GStaticRecMutex download_lock;
+  GStaticRecMutex download_task_lock;
   gboolean cancelled;
   GMutex *download_timed_lock;