dashdemux: move the buffers queues to the streams
authorThiago Santos <thiago.sousa.santos@collabora.com>
Mon, 28 Jan 2013 15:28:29 +0000 (12:28 -0300)
committerThiago Santos <thiago.sousa.santos@collabora.com>
Wed, 8 May 2013 21:14:34 +0000 (18:14 -0300)
Store the buffers separately for each stream, this is clearer than
having a queue with a list of buffers. It also allows easier selection
of buffers to push in later refactors

ext/dash/gstdashdemux.c
ext/dash/gstdashdemux.h

index ca091e8..dcc47dc 100644 (file)
@@ -219,6 +219,8 @@ static float gst_dash_demux_get_buffering_ratio (GstDashDemux * demux);
 static GstBuffer *gst_dash_demux_merge_buffer_list (GstFragment * fragment);
 static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
     GstActiveStream * stream);
+static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream
+    * stream);
 
 static void
 _do_init (GType type)
@@ -282,8 +284,6 @@ gst_dash_demux_dispose (GObject * obj)
 
   gst_dash_demux_reset (demux, TRUE);
 
-  g_queue_free (demux->queue);
-
   G_OBJECT_CLASS (parent_class)->dispose (obj);
 }
 
@@ -349,7 +349,6 @@ gst_dash_demux_init (GstDashDemux * demux, GstDashDemuxClass * klass)
   demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE;
   demux->max_bitrate = DEFAULT_MAX_BITRATE;
 
-  demux->queue = g_queue_new ();
   /* Updates task */
   g_static_rec_mutex_init (&demux->download_lock);
   demux->download_task =
@@ -457,20 +456,33 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
   return ret;
 }
 
+static gboolean
+gst_dash_demux_all_queues_have_data (GstDashDemux * demux)
+{
+  GSList *iter;
+
+  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+    GstDashDemuxStream *stream = iter->data;
+    if (g_queue_is_empty (stream->queue)) {
+      return FALSE;
+    }
+  }
+  return TRUE;
+}
+
 static void
-gst_dash_demux_clear_queue (GstDashDemux * demux)
+gst_dash_demux_clear_queues (GstDashDemux * demux)
 {
-  while (!g_queue_is_empty (demux->queue)) {
-    GList *listfragment = g_queue_pop_head (demux->queue);
-    guint j = 0;
-    while (j < g_list_length (listfragment)) {
-      GstFragment *fragment = g_list_nth_data (listfragment, j);
+  GSList *iter;
+
+  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+    GstDashDemuxStream *stream = iter->data;
+    while (!g_queue_is_empty (stream->queue)) {
+      GstFragment *fragment = g_queue_pop_head (stream->queue);
       g_object_unref (fragment);
-      j++;
     }
-    g_list_free (listfragment);
+    g_queue_clear (stream->queue);
   }
-  g_queue_clear (demux->queue);
 }
 
 static gboolean
@@ -587,7 +599,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
 
       /* Clear the buffering queue */
       /* FIXME: allow seeking in the buffering queue */
-      gst_dash_demux_clear_queue (demux);
+      gst_dash_demux_clear_queues (demux);
 
       //GST_MPD_CLIENT_LOCK (demux->client);
       GST_DEBUG_OBJECT (demux, "Seeking to sequence %d", current_sequence);
@@ -674,6 +686,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
 
     stream = g_new0 (GstDashDemuxStream, 1);
     caps = gst_dash_demux_get_input_caps (demux, active_stream);
+    stream->queue = g_queue_new ();
 
     stream->index = i;
     stream->input_caps = caps;
@@ -900,7 +913,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set)
     if (oldpad) {
       oldpads = g_slist_prepend (oldpads, oldpad);
       GST_DEBUG_OBJECT (demux,
-          "Switching pads (oldpad:%p) %" GST_PTR_FORMAT, oldpad);
+          "Switching pads (oldpad:%p) %" GST_PTR_FORMAT, oldpad, oldpad);
     }
   }
   /* Create and activate new pads */
@@ -914,7 +927,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set)
     gst_pad_set_element_private (stream->pad, demux);
     gst_pad_set_active (stream->pad, TRUE);
     gst_pad_set_caps (stream->pad, stream->output_caps);
-    gst_element_add_pad (GST_ELEMENT (demux), stream->pad);
+    gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad));
     GST_INFO_OBJECT (demux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
         GST_DEBUG_PAD_NAME (stream->pad), stream->output_caps);
   }
@@ -928,6 +941,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set)
     gst_pad_push_event (pad, gst_event_new_eos ());
     gst_pad_set_active (pad, FALSE);
     gst_element_remove_pad (GST_ELEMENT (demux), pad);
+    gst_object_unref (pad);
   }
   g_slist_free (oldpads);
 }
@@ -948,7 +962,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set)
  * 
  */
 static gboolean
-needs_pad_switch (GstDashDemux * demux, GList * fragment)
+needs_pad_switch (GstDashDemux * demux)
 {
   gboolean switch_pad = FALSE;
   guint i = 0;
@@ -956,7 +970,7 @@ needs_pad_switch (GstDashDemux * demux, GList * fragment)
 
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
-    GstFragment *newFragment = g_list_nth_data (fragment, i);
+    GstFragment *newFragment = g_queue_peek_head (stream->queue);
     GstCaps *srccaps = NULL;
 
     if (newFragment == NULL) {
@@ -1002,7 +1016,6 @@ needs_pad_switch (GstDashDemux * demux, GList * fragment)
 static void
 gst_dash_demux_stream_loop (GstDashDemux * demux)
 {
-  GList *listfragment;
   GstFlowReturn ret;
   GstBufferList *buffer_list;
   guint nb_adaptation_set = 0;
@@ -1011,10 +1024,13 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
   guint i = 0;
   GSList *iter;
 
-  if (g_queue_is_empty (demux->queue)) {
+  GST_LOG_OBJECT (demux, "Starting stream loop");
+
+  if (!gst_dash_demux_all_queues_have_data (demux)) {
     if (demux->end_of_manifest)
       goto end_of_manifest;
 
+    GST_DEBUG_OBJECT (demux, "Ending stream loop, no buffers to push");
     return;
   }
 
@@ -1031,19 +1047,22 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
               100 * gst_dash_demux_get_buffering_ratio (demux)));
     }
   }
-  listfragment = g_queue_pop_head (demux->queue);
-  nb_adaptation_set = g_list_length (listfragment);
 
   /* Figure out if we need to create/switch pads */
-  switch_pad = needs_pad_switch (demux, listfragment);
+  switch_pad = needs_pad_switch (demux);
   if (switch_pad) {
     GST_WARNING ("Switching pads");
     switch_pads (demux, nb_adaptation_set);
     demux->need_segment = TRUE;
   }
+
   for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
-    GstFragment *fragment = g_list_nth_data (listfragment, i);
+    GstFragment *fragment = g_queue_pop_head (stream->queue);
+
+    if (!fragment)
+      continue;
+
     active_stream = gst_mpdparser_get_active_stream_by_index (demux->client, i);
     if (demux->need_segment) {
       GstClockTime start = fragment->start_time + demux->position_shift;
@@ -1065,7 +1084,6 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
   }
   demux->need_segment = FALSE;
   demux->position_shift = 0;
-  g_list_free (listfragment);
 
   return;
 
@@ -1093,6 +1111,22 @@ error_pushing:
 }
 
 static void
+gst_dash_demux_stream_free (GstDashDemuxStream * stream)
+{
+  if (stream->input_caps)
+    gst_caps_unref (stream->input_caps);
+  if (stream->output_caps)
+    gst_caps_unref (stream->output_caps);
+  if (stream->pad)
+    gst_object_unref (stream->pad);
+
+  /* TODO flush the queue */
+  g_queue_free (stream->queue);
+
+  g_free (stream);
+}
+
+static void
 gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
 {
   GSList *iter;
@@ -1101,13 +1135,14 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
   demux->end_of_manifest = FALSE;
   demux->cancelled = FALSE;
 
+  gst_dash_demux_clear_queues (demux);
+
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
-    if (stream->input_caps) {
-      gst_caps_unref (stream->input_caps);
-      stream->input_caps = NULL;
-    }
+    gst_dash_demux_stream_free (stream);
   }
+  g_slist_free (demux->streams);
+  demux->streams = NULL;
 
   if (demux->manifest) {
     gst_buffer_unref (demux->manifest);
@@ -1121,8 +1156,6 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
     demux->client = gst_mpd_client_new ();
   }
 
-  gst_dash_demux_clear_queue (demux);
-
   demux->last_manifest_update = GST_CLOCK_TIME_NONE;
   demux->position = 0;
   demux->position_shift = 0;
@@ -1134,18 +1167,31 @@ static GstClockTime
 gst_dash_demux_get_buffering_time (GstDashDemux * demux)
 {
   GstClockTime buffer_time = 0;
-  GList *listfragment;
-  GstFragment *first_fragment, *last_fragment;
+  GSList *iter;
 
-  if (g_queue_is_empty (demux->queue))
-    return 0;
+  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+    buffer_time = gst_dash_demux_stream_get_buffering_time (iter->data);
+
+    if (buffer_time)
+      return buffer_time;
+  }
+
+  return 0;
+}
+
+static GstClockTime
+gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
+{
+  GstFragment *first_fragment, *last_fragment;
+  GstClockTime buffer_time = 0;
 
   /* get first fragment */
-  listfragment = g_queue_peek_head (demux->queue);
-  first_fragment = listfragment->data;
+  first_fragment = g_queue_peek_head (stream->queue);
   /* get last fragment */
-  listfragment = g_queue_peek_tail (demux->queue);
-  last_fragment = listfragment->data;
+  last_fragment = g_queue_peek_tail (stream->queue);
+
+  if (!first_fragment && !last_fragment)
+    return 0;
 
   if (first_fragment && last_fragment) {
     buffer_time = last_fragment->stop_time - first_fragment->start_time;
@@ -1746,11 +1792,10 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
     }
 
     gst_fragment_set_caps (download, stream->input_caps);
-    fragment_set = g_list_append (fragment_set, download);
+    g_queue_push_tail (stream->queue, download);
     size_buffer += gst_fragment_get_buffer_size (download);
   }
-  /* Push fragment set into the queue */
-  g_queue_push_tail (demux->queue, fragment_set);
+
   /* Wake the download task up */
   GST_TASK_SIGNAL (demux->download_task);
   g_get_current_time (&now);
index 0e4d07e..3fd8a3f 100644 (file)
@@ -63,6 +63,8 @@ struct _GstDashDemuxStream
 
   GstCaps *output_caps;
   GstCaps *input_caps;
+
+  GQueue *queue;
 };
 
 /**
@@ -80,7 +82,6 @@ struct _GstDashDemux
   GstBuffer *manifest;
   GstUriDownloader *downloader;
   GstMpdClient *client;         /* MPD client */
-  GQueue *queue;                /* Video/Audio/Application List of fragment storing the fetched fragments */
   gboolean end_of_period;
   gboolean end_of_manifest;