From 27b1abbda3db0fb6072947e889ecf9be8bc83aa7 Mon Sep 17 00:00:00 2001 From: Thiago Santos Date: Mon, 28 Jan 2013 12:28:29 -0300 Subject: [PATCH] dashdemux: move the buffers queues to the streams Store the buffers separately for each stream, this is clearer than having a queue with a list of buffers. It also allows easier selection of buffers to push in later refactors --- ext/dash/gstdashdemux.c | 127 ++++++++++++++++++++++++++++++++---------------- ext/dash/gstdashdemux.h | 3 +- 2 files changed, 88 insertions(+), 42 deletions(-) diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index ca091e8..dcc47dc 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -219,6 +219,8 @@ static float gst_dash_demux_get_buffering_ratio (GstDashDemux * demux); static GstBuffer *gst_dash_demux_merge_buffer_list (GstFragment * fragment); static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream); +static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream + * stream); static void _do_init (GType type) @@ -282,8 +284,6 @@ gst_dash_demux_dispose (GObject * obj) gst_dash_demux_reset (demux, TRUE); - g_queue_free (demux->queue); - G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -349,7 +349,6 @@ gst_dash_demux_init (GstDashDemux * demux, GstDashDemuxClass * klass) demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE; demux->max_bitrate = DEFAULT_MAX_BITRATE; - demux->queue = g_queue_new (); /* Updates task */ g_static_rec_mutex_init (&demux->download_lock); demux->download_task = @@ -457,20 +456,33 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition) return ret; } +static gboolean +gst_dash_demux_all_queues_have_data (GstDashDemux * demux) +{ + GSList *iter; + + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { + GstDashDemuxStream *stream = iter->data; + if (g_queue_is_empty (stream->queue)) { + return FALSE; + } + } + return TRUE; +} + static void -gst_dash_demux_clear_queue (GstDashDemux * demux) +gst_dash_demux_clear_queues (GstDashDemux * demux) { - while (!g_queue_is_empty (demux->queue)) { - GList *listfragment = g_queue_pop_head (demux->queue); - guint j = 0; - while (j < g_list_length (listfragment)) { - GstFragment *fragment = g_list_nth_data (listfragment, j); + GSList *iter; + + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { + GstDashDemuxStream *stream = iter->data; + while (!g_queue_is_empty (stream->queue)) { + GstFragment *fragment = g_queue_pop_head (stream->queue); g_object_unref (fragment); - j++; } - g_list_free (listfragment); + g_queue_clear (stream->queue); } - g_queue_clear (demux->queue); } static gboolean @@ -587,7 +599,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event) /* Clear the buffering queue */ /* FIXME: allow seeking in the buffering queue */ - gst_dash_demux_clear_queue (demux); + gst_dash_demux_clear_queues (demux); //GST_MPD_CLIENT_LOCK (demux->client); GST_DEBUG_OBJECT (demux, "Seeking to sequence %d", current_sequence); @@ -674,6 +686,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) stream = g_new0 (GstDashDemuxStream, 1); caps = gst_dash_demux_get_input_caps (demux, active_stream); + stream->queue = g_queue_new (); stream->index = i; stream->input_caps = caps; @@ -900,7 +913,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set) if (oldpad) { oldpads = g_slist_prepend (oldpads, oldpad); GST_DEBUG_OBJECT (demux, - "Switching pads (oldpad:%p) %" GST_PTR_FORMAT, oldpad); + "Switching pads (oldpad:%p) %" GST_PTR_FORMAT, oldpad, oldpad); } } /* Create and activate new pads */ @@ -914,7 +927,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set) gst_pad_set_element_private (stream->pad, demux); gst_pad_set_active (stream->pad, TRUE); gst_pad_set_caps (stream->pad, stream->output_caps); - gst_element_add_pad (GST_ELEMENT (demux), stream->pad); + gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad)); GST_INFO_OBJECT (demux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT, GST_DEBUG_PAD_NAME (stream->pad), stream->output_caps); } @@ -928,6 +941,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set) gst_pad_push_event (pad, gst_event_new_eos ()); gst_pad_set_active (pad, FALSE); gst_element_remove_pad (GST_ELEMENT (demux), pad); + gst_object_unref (pad); } g_slist_free (oldpads); } @@ -948,7 +962,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set) * */ static gboolean -needs_pad_switch (GstDashDemux * demux, GList * fragment) +needs_pad_switch (GstDashDemux * demux) { gboolean switch_pad = FALSE; guint i = 0; @@ -956,7 +970,7 @@ needs_pad_switch (GstDashDemux * demux, GList * fragment) for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - GstFragment *newFragment = g_list_nth_data (fragment, i); + GstFragment *newFragment = g_queue_peek_head (stream->queue); GstCaps *srccaps = NULL; if (newFragment == NULL) { @@ -1002,7 +1016,6 @@ needs_pad_switch (GstDashDemux * demux, GList * fragment) static void gst_dash_demux_stream_loop (GstDashDemux * demux) { - GList *listfragment; GstFlowReturn ret; GstBufferList *buffer_list; guint nb_adaptation_set = 0; @@ -1011,10 +1024,13 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) guint i = 0; GSList *iter; - if (g_queue_is_empty (demux->queue)) { + GST_LOG_OBJECT (demux, "Starting stream loop"); + + if (!gst_dash_demux_all_queues_have_data (demux)) { if (demux->end_of_manifest) goto end_of_manifest; + GST_DEBUG_OBJECT (demux, "Ending stream loop, no buffers to push"); return; } @@ -1031,19 +1047,22 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) 100 * gst_dash_demux_get_buffering_ratio (demux))); } } - listfragment = g_queue_pop_head (demux->queue); - nb_adaptation_set = g_list_length (listfragment); /* Figure out if we need to create/switch pads */ - switch_pad = needs_pad_switch (demux, listfragment); + switch_pad = needs_pad_switch (demux); if (switch_pad) { GST_WARNING ("Switching pads"); switch_pads (demux, nb_adaptation_set); demux->need_segment = TRUE; } + for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - GstFragment *fragment = g_list_nth_data (listfragment, i); + GstFragment *fragment = g_queue_pop_head (stream->queue); + + if (!fragment) + continue; + active_stream = gst_mpdparser_get_active_stream_by_index (demux->client, i); if (demux->need_segment) { GstClockTime start = fragment->start_time + demux->position_shift; @@ -1065,7 +1084,6 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) } demux->need_segment = FALSE; demux->position_shift = 0; - g_list_free (listfragment); return; @@ -1093,6 +1111,22 @@ error_pushing: } static void +gst_dash_demux_stream_free (GstDashDemuxStream * stream) +{ + if (stream->input_caps) + gst_caps_unref (stream->input_caps); + if (stream->output_caps) + gst_caps_unref (stream->output_caps); + if (stream->pad) + gst_object_unref (stream->pad); + + /* TODO flush the queue */ + g_queue_free (stream->queue); + + g_free (stream); +} + +static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose) { GSList *iter; @@ -1101,13 +1135,14 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose) demux->end_of_manifest = FALSE; demux->cancelled = FALSE; + gst_dash_demux_clear_queues (demux); + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - if (stream->input_caps) { - gst_caps_unref (stream->input_caps); - stream->input_caps = NULL; - } + gst_dash_demux_stream_free (stream); } + g_slist_free (demux->streams); + demux->streams = NULL; if (demux->manifest) { gst_buffer_unref (demux->manifest); @@ -1121,8 +1156,6 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose) demux->client = gst_mpd_client_new (); } - gst_dash_demux_clear_queue (demux); - demux->last_manifest_update = GST_CLOCK_TIME_NONE; demux->position = 0; demux->position_shift = 0; @@ -1134,18 +1167,31 @@ static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux) { GstClockTime buffer_time = 0; - GList *listfragment; - GstFragment *first_fragment, *last_fragment; + GSList *iter; - if (g_queue_is_empty (demux->queue)) - return 0; + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { + buffer_time = gst_dash_demux_stream_get_buffering_time (iter->data); + + if (buffer_time) + return buffer_time; + } + + return 0; +} + +static GstClockTime +gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream) +{ + GstFragment *first_fragment, *last_fragment; + GstClockTime buffer_time = 0; /* get first fragment */ - listfragment = g_queue_peek_head (demux->queue); - first_fragment = listfragment->data; + first_fragment = g_queue_peek_head (stream->queue); /* get last fragment */ - listfragment = g_queue_peek_tail (demux->queue); - last_fragment = listfragment->data; + last_fragment = g_queue_peek_tail (stream->queue); + + if (!first_fragment && !last_fragment) + return 0; if (first_fragment && last_fragment) { buffer_time = last_fragment->stop_time - first_fragment->start_time; @@ -1746,11 +1792,10 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux) } gst_fragment_set_caps (download, stream->input_caps); - fragment_set = g_list_append (fragment_set, download); + g_queue_push_tail (stream->queue, download); size_buffer += gst_fragment_get_buffer_size (download); } - /* Push fragment set into the queue */ - g_queue_push_tail (demux->queue, fragment_set); + /* Wake the download task up */ GST_TASK_SIGNAL (demux->download_task); g_get_current_time (&now); diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index 0e4d07e..3fd8a3f 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -63,6 +63,8 @@ struct _GstDashDemuxStream GstCaps *output_caps; GstCaps *input_caps; + + GQueue *queue; }; /** @@ -80,7 +82,6 @@ struct _GstDashDemux GstBuffer *manifest; GstUriDownloader *downloader; GstMpdClient *client; /* MPD client */ - GQueue *queue; /* Video/Audio/Application List of fragment storing the fetched fragments */ gboolean end_of_period; gboolean end_of_manifest; -- 2.7.4