From 31e4ba0094a1fd516fbaa20ec86a543e337d249b Mon Sep 17 00:00:00 2001 From: Thiago Santos Date: Tue, 3 Dec 2013 16:16:09 -0300 Subject: [PATCH] dashdemux: Use 1 download task per stream Instead of having a single download task for all streams, this commit makes each stream have its own download loop, allowing parallel download of fragments. --- ext/dash/gstdashdemux.c | 383 ++++++++++++++++++++++-------------------------- ext/dash/gstdashdemux.h | 4 +- 2 files changed, 179 insertions(+), 208 deletions(-) diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index 2c79600..931a9f1 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -181,6 +181,9 @@ enum #define DEFAULT_FAILED_COUNT 3 #define DOWNLOAD_RATE_HISTORY_MAX 3 +#define GST_DASH_DEMUX_DOWNLOAD_LOCK(d) g_mutex_lock (&d->download_mutex) +#define GST_DASH_DEMUX_DOWNLOAD_UNLOCK(d) g_mutex_unlock (&d->download_mutex) + /* Custom internal event to signal end of period */ #define GST_EVENT_DASH_EOP GST_EVENT_MAKE_TYPE(81, GST_EVENT_TYPE_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED) static GstEvent * @@ -210,14 +213,17 @@ static gboolean gst_dash_demux_src_event (GstPad * pad, GstObject * parent, static gboolean gst_dash_demux_src_query (GstPad * pad, GstObject * parent, GstQuery * query); static void gst_dash_demux_stream_loop (GstDashDemux * demux); -static void gst_dash_demux_download_loop (GstDashDemux * demux); +static void gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream); static void gst_dash_demux_stop (GstDashDemux * demux); static void gst_dash_demux_resume_stream_task (GstDashDemux * demux); static void gst_dash_demux_resume_download_task (GstDashDemux * demux); static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux); -static gboolean gst_dash_demux_select_representations (GstDashDemux * demux); -static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux, - GstActiveStream ** stream, GstClockTime * next_ts); +static gboolean +gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream * + stream); +static GstFlowReturn gst_dash_demux_get_next_fragment (GstDashDemux * demux, + GstDashDemuxStream * stream, GstActiveStream ** active_stream, + GstClockTime * next_ts); static gboolean gst_dash_demux_advance_period (GstDashDemux * demux); static void gst_dash_demux_download_wait (GstDashDemux * demux, GstClockTime time_diff); @@ -255,11 +261,6 @@ gst_dash_demux_dispose (GObject * obj) demux->stream_task = NULL; } - if (demux->download_task) { - gst_object_unref (demux->download_task); - g_rec_mutex_clear (&demux->download_task_lock); - demux->download_task = NULL; - } g_cond_clear (&demux->download_cond); g_mutex_clear (&demux->download_mutex); @@ -345,11 +346,6 @@ gst_dash_demux_init (GstDashDemux * demux) demux->max_bitrate = DEFAULT_MAX_BITRATE; /* Updates task */ - g_rec_mutex_init (&demux->download_task_lock); - demux->download_task = - gst_task_new ((GstTaskFunction) gst_dash_demux_download_loop, demux, - NULL); - gst_task_set_lock (demux->download_task, &demux->download_task_lock); g_cond_init (&demux->download_cond); g_mutex_init (&demux->download_mutex); @@ -696,11 +692,18 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) continue; stream = g_new0 (GstDashDemuxStream, 1); + stream->demux = demux; caps = gst_dash_demux_get_input_caps (demux, active_stream); stream->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) _check_queue_full, NULL, NULL, demux); + g_rec_mutex_init (&stream->download_task_lock); + stream->download_task = + gst_task_new ((GstTaskFunction) gst_dash_demux_stream_download_loop, + stream, NULL); + gst_task_set_lock (stream->download_task, &stream->download_task_lock); + stream->index = i; stream->input_caps = caps; stream->need_header = TRUE; @@ -1014,18 +1017,18 @@ gst_dash_demux_stop (GstDashDemux * demux) GstDashDemuxStream *stream = iter->data; gst_data_queue_set_flushing (stream->queue, TRUE); + if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) { + GST_TASK_SIGNAL (stream->download_task); + gst_task_stop (stream->download_task); + g_mutex_lock (&demux->download_mutex); + g_cond_signal (&demux->download_cond); + g_mutex_unlock (&demux->download_mutex); + g_rec_mutex_lock (&stream->download_task_lock); + g_rec_mutex_unlock (&stream->download_task_lock); + gst_task_join (stream->download_task); + } } - if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) { - GST_TASK_SIGNAL (demux->download_task); - gst_task_stop (demux->download_task); - g_mutex_lock (&demux->download_mutex); - g_cond_signal (&demux->download_cond); - g_mutex_unlock (&demux->download_mutex); - g_rec_mutex_lock (&demux->download_task_lock); - g_rec_mutex_unlock (&demux->download_task_lock); - gst_task_join (demux->download_task); - } if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { GST_TASK_SIGNAL (demux->stream_task); gst_task_stop (demux->stream_task); @@ -1309,12 +1312,7 @@ flushing: end_of_manifest: { - GST_INFO_OBJECT (demux, "Reached end of manifest, sending EOS"); - for (iter = demux->streams; iter; iter = g_slist_next (iter)) { - GstDashDemuxStream *stream = iter->data; - gst_pad_push_event (stream->pad, gst_event_new_eos ()); - } - GST_INFO_OBJECT (demux, "Stopped streaming task"); + GST_INFO_OBJECT (demux, "Reached end of manifest, stopping streaming task"); gst_task_stop (demux->stream_task); return; } @@ -1346,6 +1344,12 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream) g_object_unref (stream->queue); stream->queue = NULL; } + if (stream->download_task) { + gst_task_stop (stream->download_task); + gst_task_join (stream->download_task); + gst_object_unref (stream->download_task); + g_rec_mutex_clear (&stream->download_task_lock); + } g_free (stream); } @@ -1437,21 +1441,6 @@ gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream) } #endif -static gboolean -gst_dash_demux_all_streams_have_data (GstDashDemux * demux) -{ - GSList *iter; - - for (iter = demux->streams; iter; iter = g_slist_next (iter)) { - GstDashDemuxStream *stream = iter->data; - - if (!stream->has_data_queued) - return FALSE; - } - - return TRUE; -} - static GstFlowReturn gst_dash_demux_refresh_mpd (GstDashDemux * demux) { @@ -1605,7 +1594,7 @@ gst_dash_demux_refresh_mpd (GstDashDemux * demux) return GST_FLOW_OK; } -/* gst_dash_demux_download_loop: +/* gst_dash_demux_stream_download_loop: * * Loop for the "download' task that fetches fragments based on the * selected representations. @@ -1633,18 +1622,23 @@ gst_dash_demux_refresh_mpd (GstDashDemux * demux) * manifest has been reached. * */ -void -gst_dash_demux_download_loop (GstDashDemux * demux) +static void +gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) { GstClockTime fragment_ts = GST_CLOCK_TIME_NONE; GstActiveStream *fragment_stream = NULL; + GstDashDemux *demux = stream->demux; + GstFlowReturn flow_ret = GST_FLOW_OK; - GST_LOG_OBJECT (demux, "Starting download loop"); + GST_LOG_OBJECT (demux, "Starting download loop %p %s:%s", stream, + GST_DEBUG_PAD_NAME (stream->pad)); + GST_DASH_DEMUX_DOWNLOAD_LOCK (demux); if (gst_mpd_client_is_live (demux->client) && demux->client->mpd_uri != NULL) { switch (gst_dash_demux_refresh_mpd (demux)) { case GST_FLOW_EOS: + GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux); goto end_of_manifest; default: break; @@ -1654,31 +1648,34 @@ gst_dash_demux_download_loop (GstDashDemux * demux) GST_DEBUG_OBJECT (demux, "download loop %i", demux->end_of_manifest); /* try to switch to another set of representations if needed */ - if (gst_dash_demux_all_streams_have_data (demux)) { - gst_dash_demux_select_representations (demux); - } + gst_dash_demux_stream_select_representation_unlocked (stream); + GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux); /* fetch the next fragment */ - while (!gst_dash_demux_get_next_fragment (demux, &fragment_stream, - &fragment_ts)) { - if (demux->end_of_period) { - GST_INFO_OBJECT (demux, "Reached the end of the Period"); - /* setup video, audio and subtitle streams, starting from the next Period */ - if (!gst_mpd_client_set_period_index (demux->client, - gst_mpd_client_get_period_index (demux->client) + 1) - || !gst_dash_demux_setup_all_streams (demux)) { - GST_INFO_OBJECT (demux, "Reached the end of the manifest file"); - demux->end_of_manifest = TRUE; - gst_task_start (demux->stream_task); - goto end_of_manifest; - } - /* start playing from the first segment of the new period */ - gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0); - demux->end_of_period = FALSE; + flow_ret = gst_dash_demux_get_next_fragment (demux, stream, &fragment_stream, + &fragment_ts); - } else if (demux->cancelled) { - goto cancelled; - } else { + switch (flow_ret) { + case GST_FLOW_OK: + break; + case GST_FLOW_EOS: + if (demux->end_of_period) { + GST_INFO_OBJECT (demux, "Reached the end of the Period"); + /* setup video, audio and subtitle streams, starting from the next Period */ + if (!gst_mpd_client_set_period_index (demux->client, + gst_mpd_client_get_period_index (demux->client) + 1) + || !gst_dash_demux_setup_all_streams (demux)) { + GST_INFO_OBJECT (demux, "Reached the end of the manifest file"); + demux->end_of_manifest = TRUE; + gst_task_start (demux->stream_task); + goto end_of_manifest; + } + /* start playing from the first segment of the new period */ + gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0); + demux->end_of_period = FALSE; + } + break; + case GST_FLOW_ERROR: /* Download failed 'by itself' * in case this is live, we might be ahead or before playback, where * segments don't exist (are still being created or were already deleted) @@ -1727,7 +1724,13 @@ gst_dash_demux_download_loop (GstDashDemux * demux) } else { goto error_downloading; } - } + break; + default: + break; + } + + if (demux->cancelled) { + goto cancelled; } GST_INFO_OBJECT (demux, "Internal buffering : %" G_GUINT64_FORMAT " s", @@ -1741,14 +1744,14 @@ quit: cancelled: { GST_WARNING_OBJECT (demux, "Cancelled, leaving download task"); - gst_task_stop (demux->download_task); + gst_task_stop (stream->download_task); return; } end_of_manifest: { GST_INFO_OBJECT (demux, "End of manifest, leaving download task"); - gst_task_stop (demux->download_task); + gst_task_stop (stream->download_task); return; } @@ -1756,7 +1759,7 @@ error_downloading: { GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND, ("Could not fetch the next fragment, leaving download task"), (NULL)); - gst_task_stop (demux->download_task); + gst_task_stop (stream->download_task); return; } } @@ -1770,87 +1773,74 @@ gst_dash_demux_resume_stream_task (GstDashDemux * demux) static void gst_dash_demux_resume_download_task (GstDashDemux * demux) { - gst_task_start (demux->download_task); + GSList *iter; + + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { + GstDashDemuxStream *stream = iter->data; + gst_task_start (stream->download_task); + } } -/* gst_dash_demux_select_representations: +/* + * gst_dash_demux_stream_select_representation_unlocked: * - * Select the most appropriate media representations based on current target + * Select the most appropriate media representation based on current target * bitrate. - * - * FIXME: all representations are selected against the same bitrate, but - * they will share the same bandwidth. This only works today because the - * audio representations bitrate usage is negligible as compared to the - * video representation one. - * - * Returns TRUE if a new set of representations has been selected */ static gboolean -gst_dash_demux_select_representations (GstDashDemux * demux) +gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream * + stream) { GstActiveStream *active_stream = NULL; GList *rep_list = NULL; gint new_index; - gboolean ret = FALSE; - GSList *iter; - GstDashDemuxStream *stream; + GstDashDemux *demux = stream->demux; + guint64 bitrate; - guint i = 0; + active_stream = + gst_mpdparser_get_active_stream_by_index (demux->client, stream->index); + if (active_stream == NULL) + return FALSE; - GST_MPD_CLIENT_LOCK (demux->client); - for (iter = demux->streams; iter; iter = g_slist_next (iter)) { - guint64 bitrate; - stream = iter->data; - active_stream = - gst_mpdparser_get_active_stream_by_index (demux->client, stream->index); - if (active_stream == NULL) - return FALSE; + /* retrieve representation list */ + if (active_stream->cur_adapt_set) + rep_list = active_stream->cur_adapt_set->Representations; + if (!rep_list) + return FALSE; - /* retrieve representation list */ - if (active_stream->cur_adapt_set) - rep_list = active_stream->cur_adapt_set->Representations; - if (!rep_list) + bitrate = + gst_download_rate_get_current_rate (&stream->dnl_rate) * + demux->bandwidth_usage; + GST_DEBUG_OBJECT (demux, "Trying to change to bitrate: %" G_GUINT64_FORMAT, + bitrate); + + /* get representation index with current max_bandwidth */ + new_index = gst_mpdparser_get_rep_idx_with_max_bandwidth (rep_list, bitrate); + + /* if no representation has the required bandwidth, take the lowest one */ + if (new_index == -1) + new_index = gst_mpdparser_get_rep_idx_with_min_bandwidth (rep_list); + + if (new_index != active_stream->representation_idx) { + GstRepresentationNode *rep = g_list_nth_data (rep_list, new_index); + GST_INFO_OBJECT (demux, "Changing representation idx: %d %d %u", + stream->index, new_index, rep->bandwidth); + if (gst_mpd_client_setup_representation (demux->client, active_stream, rep)) { + stream->need_header = TRUE; + stream->has_data_queued = FALSE; + GST_INFO_OBJECT (demux, "Switching bitrate to %d", + active_stream->cur_representation->bandwidth); + gst_caps_unref (stream->input_caps); + stream->input_caps = gst_dash_demux_get_input_caps (demux, active_stream); + gst_dash_demux_stream_push_event (stream, + gst_event_new_caps (stream->input_caps)); + return TRUE; + } else { + GST_WARNING_OBJECT (demux, "Can not switch representation, aborting..."); return FALSE; - - bitrate = - gst_download_rate_get_current_rate (&stream->dnl_rate) * - demux->bandwidth_usage; - GST_DEBUG_OBJECT (demux, "Trying to change to bitrate: %" G_GUINT64_FORMAT, - bitrate); - - /* get representation index with current max_bandwidth */ - new_index = - gst_mpdparser_get_rep_idx_with_max_bandwidth (rep_list, bitrate); - - /* if no representation has the required bandwidth, take the lowest one */ - if (new_index == -1) - new_index = gst_mpdparser_get_rep_idx_with_min_bandwidth (rep_list); - - if (new_index != active_stream->representation_idx) { - GstRepresentationNode *rep = g_list_nth_data (rep_list, new_index); - GST_INFO_OBJECT (demux, "Changing representation idx: %d %d %u", - stream->index, new_index, rep->bandwidth); - if (gst_mpd_client_setup_representation (demux->client, active_stream, - rep)) { - ret = TRUE; - stream->need_header = TRUE; - stream->has_data_queued = FALSE; - GST_INFO_OBJECT (demux, "Switching bitrate to %d", - active_stream->cur_representation->bandwidth); - gst_caps_unref (stream->input_caps); - stream->input_caps = - gst_dash_demux_get_input_caps (demux, active_stream); - gst_dash_demux_stream_push_event (stream, - gst_event_new_caps (stream->input_caps)); - } else { - GST_WARNING_OBJECT (demux, - "Can not switch representation, aborting..."); - } } - i++; } - GST_MPD_CLIENT_UNLOCK (demux->client); - return ret; + return FALSE; } static GstBuffer * @@ -2044,7 +2034,8 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux, static gboolean gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, - GstDashDemuxStream * demux_stream, guint64 * size_buffer) + GstDashDemuxStream * demux_stream, guint64 * size_buffer, + GstClockTime * download_time) { GstActiveStream *active_stream; GstFragment *download; @@ -2118,6 +2109,7 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, demux_stream->need_header = FALSE; } g_get_current_time (&now); + *download_time = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start)); buffer = gst_buffer_make_writable (buffer); @@ -2149,112 +2141,91 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, * Returns FALSE if an error occured while downloading fragments * */ -static gboolean +static GstFlowReturn gst_dash_demux_get_next_fragment (GstDashDemux * demux, - GstActiveStream ** stream, GstClockTime * selected_ts) + GstDashDemuxStream * stream, GstActiveStream ** active_stream, + GstClockTime * selected_ts) { guint64 buffer_size = 0; - GTimeVal now; - GTimeVal start; GstClockTime diff; - GSList *iter; gboolean end_of_period = TRUE; - GstDashDemuxStream *selected_stream = NULL; - GstClockTime best_time = GST_CLOCK_TIME_NONE; - GSList *streams; - - g_mutex_lock (&demux->streams_lock); - /* TODO add check */ - streams = g_slist_last (demux->next_periods)->data; - g_mutex_unlock (&demux->streams_lock); + GstClockTime ts; - for (iter = streams; iter; iter = g_slist_next (iter)) { - GstDashDemuxStream *stream = iter->data; - GstClockTime ts; + if (stream->download_end_of_period) + return GST_FLOW_EOS; - if (stream->download_end_of_period) - continue; + if (stream->last_ret == GST_FLOW_NOT_LINKED) { + GST_LOG_OBJECT (demux, "Skipping stream %p %s:%s : not-linked", + stream, GST_DEBUG_PAD_NAME (stream->pad)); + return GST_FLOW_NOT_LINKED; + } - if (gst_mpd_client_get_next_fragment_timestamp (demux->client, - stream->index, &ts)) { - if (ts < best_time || !GST_CLOCK_TIME_IS_VALID (best_time)) { - selected_stream = stream; - best_time = ts; - } - } else { - GstEvent *event = NULL; + if (gst_mpd_client_get_next_fragment_timestamp (demux->client, + stream->index, &ts)) { + } else { + GstEvent *event = NULL; - GST_INFO_OBJECT (demux, - "This Period doesn't contain more fragments for stream %u", - stream->index); + GST_INFO_OBJECT (demux, + "This Period doesn't contain more fragments for stream %u", + stream->index); - /* check if this is live and we should wait for more data */ - if (gst_mpd_client_is_live (demux->client) - && demux->client->mpd_node->minimumUpdatePeriod != -1) { - end_of_period = FALSE; - continue; - } + /* check if this is live and we should wait for more data */ + if (gst_mpd_client_is_live (demux->client) + && demux->client->mpd_node->minimumUpdatePeriod != -1) { + end_of_period = FALSE; + return GST_FLOW_OK; /* TODO wait */ + } - if (gst_mpd_client_has_next_period (demux->client)) { - event = gst_event_new_dash_eop (); - } else { - GST_DEBUG_OBJECT (demux, - "No more fragments or periods for this stream, setting EOS"); - event = gst_event_new_eos (); - } - stream->download_end_of_period = TRUE; - gst_dash_demux_stream_push_event (stream, event); + if (gst_mpd_client_has_next_period (demux->client)) { + event = gst_event_new_dash_eop (); + } else { + GST_DEBUG_OBJECT (demux, + "No more fragments or periods for this stream, setting EOS"); + event = gst_event_new_eos (); } + stream->download_end_of_period = TRUE; + gst_dash_demux_stream_push_event (stream, event); } - if (selected_ts) - *selected_ts = best_time; - if (stream && selected_stream) - *stream = - gst_mpdparser_get_active_stream_by_index (demux->client, - selected_stream->index); + *active_stream = + gst_mpdparser_get_active_stream_by_index (demux->client, stream->index); /* * If this is a live stream, check the segment end time to make sure * it is available to download */ - if (selected_stream && gst_mpd_client_is_live (demux->client) && + if (stream && gst_mpd_client_is_live (demux->client) && demux->client->mpd_node->minimumUpdatePeriod != -1) { - gst_dash_demux_wait_for_fragment_to_be_available (demux, *stream); + gst_dash_demux_wait_for_fragment_to_be_available (demux, *active_stream); } /* Get the fragment corresponding to each stream index */ - if (selected_stream) { - g_get_current_time (&start); - gst_dash_demux_get_next_fragment_for_stream (demux, selected_stream, - &buffer_size); - g_get_current_time (&now); + if (stream) { + gst_dash_demux_get_next_fragment_for_stream (demux, stream, &buffer_size, + &diff); end_of_period = FALSE; } demux->end_of_period = end_of_period; if (end_of_period) - return FALSE; + return GST_FLOW_EOS; - /* Wake the download task up */ - GST_TASK_SIGNAL (demux->download_task); - if (selected_stream) { + if (stream && buffer_size > 0 && diff > 0) { #ifndef GST_DISABLE_GST_DEBUG guint64 brate; #endif - diff = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start)); - gst_download_rate_add_rate (&selected_stream->dnl_rate, buffer_size, diff); + gst_download_rate_add_rate (&stream->dnl_rate, buffer_size, diff); #ifndef GST_DISABLE_GST_DEBUG brate = (buffer_size * 8) / ((double) diff / GST_SECOND); #endif GST_INFO_OBJECT (demux, "Stream: %d Download rate = %" G_GUINT64_FORMAT " Kbits/s (%" - G_GUINT64_FORMAT " Ko in %.2f s)", selected_stream->index, brate / 1000, + G_GUINT64_FORMAT " Ko in %.2f s)", stream->index, brate / 1000, buffer_size / 1024, ((double) diff / GST_SECOND)); } - return TRUE; + return GST_FLOW_OK; } static void diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index 2065365..84baa8e 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -94,6 +94,8 @@ struct _GstDashDemuxStream gboolean has_data_queued; GstDataQueue *queue; + GstTask *download_task; + GRecMutex download_task_lock; GstDownloadRate dnl_rate; }; @@ -135,8 +137,6 @@ struct _GstDashDemux GRecMutex stream_task_lock; /* Download task */ - GstTask *download_task; - GRecMutex download_task_lock; GMutex download_mutex; GCond download_cond; gboolean cancelled; -- 2.7.4