From: Thiago Santos Date: Fri, 20 Dec 2013 13:05:22 +0000 (-0300) Subject: dashdemux: remove stream loop thread X-Git-Tag: 1.19.3~507^2~12799 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=da329b44a418a10b4d59a592c9811d37a43ed73e;p=platform%2Fupstream%2Fgstreamer.git dashdemux: remove stream loop thread Download and push from the same task, makes code a lot simpler to maintain. Also pushing from separate threads avoids deadlocking when gst_pad_push blocks due to downstream queues being full. --- diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index 61723de..a1f3418 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -147,6 +147,7 @@ #include #include #include +#include "gst/gst-i18n-plugin.h" #include "gstdashdemux.h" #include "gstdash_debug.h" @@ -184,14 +185,6 @@ enum #define GST_DASH_DEMUX_CLIENT_LOCK(d) g_mutex_lock (&d->client_lock) #define GST_DASH_DEMUX_CLIENT_UNLOCK(d) g_mutex_unlock (&d->client_lock) -/* Custom internal event to signal end of period */ -#define GST_EVENT_DASH_EOP GST_EVENT_MAKE_TYPE(81, GST_EVENT_TYPE_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED) -static GstEvent * -gst_event_new_dash_eop (void) -{ - return gst_event_new_custom (GST_EVENT_DASH_EOP, NULL); -} - /* GObject */ static void gst_dash_demux_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -212,17 +205,16 @@ static gboolean gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event); static gboolean gst_dash_demux_src_query (GstPad * pad, GstObject * parent, GstQuery * query); -static void gst_dash_demux_stream_loop (GstDashDemux * demux); static void gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream); static void gst_dash_demux_stop (GstDashDemux * demux); -static void gst_dash_demux_resume_stream_task (GstDashDemux * demux); +static void gst_dash_demux_wait_stop (GstDashDemux * demux); static void gst_dash_demux_resume_download_task (GstDashDemux * demux); static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux); -static gboolean -gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream * +static GstEvent + * gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream * stream); -static GstFlowReturn gst_dash_demux_get_next_fragment (GstDashDemux * demux, - GstDashDemuxStream * stream, GstClockTime * next_ts); +static GstFlowReturn gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream + * stream, GstClockTime * ts); static gboolean gst_dash_demux_advance_period (GstDashDemux * demux); static void gst_dash_demux_download_wait (GstDashDemuxStream * stream, GstClockTime time_diff); @@ -232,11 +224,6 @@ static void gst_dash_demux_remove_streams (GstDashDemux * demux, GSList * streams); static void gst_dash_demux_stream_free (GstDashDemuxStream * stream); static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose); -#ifndef GST_DISABLE_GST_DEBUG -static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux); -static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream - * stream); -#endif static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream); static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux); @@ -254,12 +241,6 @@ gst_dash_demux_dispose (GObject * obj) gst_dash_demux_reset (demux, TRUE); - if (demux->stream_task) { - gst_object_unref (demux->stream_task); - g_rec_mutex_clear (&demux->stream_task_lock); - demux->stream_task = NULL; - } - if (demux->downloader != NULL) { g_object_unref (demux->downloader); demux->downloader = NULL; @@ -341,12 +322,6 @@ gst_dash_demux_init (GstDashDemux * demux) demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE; demux->max_bitrate = DEFAULT_MAX_BITRATE; - /* Streaming task */ - g_rec_mutex_init (&demux->stream_task_lock); - demux->stream_task = - gst_task_new ((GstTaskFunction) gst_dash_demux_stream_loop, demux, NULL); - gst_task_set_lock (demux->stream_task, &demux->stream_task_lock); - g_mutex_init (&demux->client_lock); } @@ -421,47 +396,6 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition) return ret; } -static gboolean -_check_queue_full (GstDataQueue * q, guint visible, guint bytes, guint64 time, - GstDashDemux * demux) -{ - return time >= demux->max_buffering_time; -} - -static void -_data_queue_item_destroy (GstDataQueueItem * item) -{ - gst_mini_object_unref (item->object); - g_free (item); -} - -static void -gst_dash_demux_stream_push_event (GstDashDemuxStream * stream, GstEvent * event) -{ - GstDataQueueItem *item = g_new0 (GstDataQueueItem, 1); - - item->object = GST_MINI_OBJECT_CAST (event); - item->destroy = (GDestroyNotify) _data_queue_item_destroy; - - gst_data_queue_push_force (stream->queue, item); -} - -static void -gst_dash_demux_stream_push_data (GstDashDemuxStream * stream, - GstBuffer * fragment) -{ - GstDataQueueItem *item = g_new (GstDataQueueItem, 1); - - item->object = GST_MINI_OBJECT_CAST (fragment); - item->duration = GST_BUFFER_DURATION (fragment); - item->visible = TRUE; - item->size = gst_buffer_get_size (fragment); - - item->destroy = (GDestroyNotify) _data_queue_item_destroy; - - gst_data_queue_push (stream->queue, item); -} - static void gst_dash_demux_stream_seek (GstDashDemuxStream * stream, GstClockTime target_pos) @@ -537,6 +471,10 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) start, stop_type, stop, &update); if (update) { + GstEvent *seg_evt; + + GST_DASH_DEMUX_CLIENT_LOCK (demux); + if (flags & GST_SEEK_FLAG_FLUSH) { GST_DEBUG_OBJECT (demux, "sending flush start"); for (iter = demux->streams; iter; iter = g_slist_next (iter)) { @@ -546,13 +484,9 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) } } - /* Stop the demux */ - demux->cancelled = TRUE; - /* Stop the demux, also clears the buffering queue */ gst_dash_demux_stop (demux); - - /* Wait for streaming to finish */ - g_rec_mutex_lock (&demux->stream_task_lock); + GST_DASH_DEMUX_CLIENT_UNLOCK (demux); + gst_dash_demux_wait_stop (demux); /* select the requested Period in the Media Presentation */ target_pos = (GstClockTime) demux->segment.start; @@ -594,9 +528,15 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) } /* Update the current sequence on all streams */ + seg_evt = gst_event_new_segment (&demux->segment); + gst_event_set_seqnum (seg_evt, gst_event_get_seqnum (event)); for (iter = demux->streams; iter; iter = g_slist_next (iter)) { - gst_dash_demux_stream_seek (iter->data, target_pos); + GstDashDemuxStream *stream = iter->data; + gst_dash_demux_stream_seek (stream, target_pos); + + gst_event_replace (&stream->pending_segment, seg_evt); } + gst_event_unref (seg_evt); if (flags & GST_SEEK_FLAG_FLUSH) { GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad"); @@ -604,31 +544,26 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) GstDashDemuxStream *stream; stream = iter->data; - stream->has_data_queued = FALSE; stream->need_header = TRUE; - stream->download_end_of_period = FALSE; - stream->stream_end_of_period = FALSE; stream->stream_eos = FALSE; gst_pad_push_event (stream->pad, gst_event_new_flush_stop (TRUE)); } } /* Restart the demux */ + GST_DASH_DEMUX_CLIENT_LOCK (demux); demux->cancelled = FALSE; demux->end_of_manifest = FALSE; for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - gst_data_queue_set_flushing (stream->queue, FALSE); stream->last_ret = GST_FLOW_OK; gst_uri_downloader_reset (stream->downloader); } demux->timestamp_offset = 0; - demux->need_segment = TRUE; gst_uri_downloader_reset (demux->downloader); GST_DEBUG_OBJECT (demux, "Resuming tasks after seeking"); gst_dash_demux_resume_download_task (demux); - gst_dash_demux_resume_stream_task (demux); - g_rec_mutex_unlock (&demux->stream_task_lock); + GST_DASH_DEMUX_CLIENT_UNLOCK (demux); } return TRUE; @@ -646,8 +581,8 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) stream->restart_download = TRUE; stream->need_header = TRUE; GST_DEBUG_OBJECT (stream->pad, "Restarting download loop"); + gst_task_start (stream->download_task); } - gst_task_start (stream->download_task); GST_DASH_DEMUX_CLIENT_UNLOCK (demux); gst_event_unref (event); return TRUE; @@ -695,12 +630,10 @@ gst_dash_demux_all_streams_eop (GstDashDemux * demux) for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - GST_LOG_OBJECT (stream->pad, "EOP: %d (linked: %d)", - stream->download_end_of_period, + GST_LOG_OBJECT (stream->pad, "EOP: %d (linked: %d)", stream->stream_eos, stream->last_ret != GST_FLOW_NOT_LINKED); - if (!stream->download_end_of_period - && stream->last_ret != GST_FLOW_NOT_LINKED) + if (!stream->stream_eos && stream->last_ret != GST_FLOW_NOT_LINKED) return FALSE; } @@ -740,9 +673,6 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) stream->demux = demux; stream->active_stream = active_stream; caps = gst_dash_demux_get_input_caps (demux, active_stream); - stream->queue = - gst_data_queue_new ((GstDataQueueCheckFullFunction) _check_queue_full, - NULL, NULL, demux); g_rec_mutex_init (&stream->download_task_lock); stream->download_task = @@ -756,7 +686,6 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) stream->index = i; stream->input_caps = caps; stream->need_header = TRUE; - stream->has_data_queued = FALSE; gst_download_rate_init (&stream->dnl_rate); gst_download_rate_set_max_length (&stream->dnl_rate, DOWNLOAD_RATE_HISTORY_MAX); @@ -788,7 +717,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) gst_pad_push_event (stream->pad, event); g_free (stream_id); - gst_dash_demux_stream_push_event (stream, gst_event_new_caps (caps)); + gst_pad_push_event (stream->pad, gst_event_new_caps (caps)); } streams = g_slist_reverse (streams); @@ -821,6 +750,7 @@ gst_dash_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched"); + GST_DASH_DEMUX_CLIENT_LOCK (demux); if (demux->client) gst_mpd_client_free (demux->client); demux->client = gst_mpd_client_new (); @@ -933,9 +863,8 @@ gst_dash_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) } } demux->timestamp_offset = -1; - demux->need_segment = TRUE; gst_dash_demux_resume_download_task (demux); - gst_dash_demux_resume_stream_task (demux); + GST_DASH_DEMUX_CLIENT_UNLOCK (demux); seek_quit: gst_event_unref (event); @@ -1049,43 +978,37 @@ gst_dash_demux_pad (GstPad * pad, GstObject * parent, GstBuffer * buf) } static void -gst_dash_demux_stop (GstDashDemux * demux) +gst_dash_demux_wait_stop (GstDashDemux * demux) { GSList *iter; - GST_DEBUG_OBJECT (demux, "Stopping demux"); - - if (demux->downloader) - gst_uri_downloader_cancel (demux->downloader); - + GST_DEBUG_OBJECT (demux, "Waiting for threads to stop"); for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - gst_data_queue_set_flushing (stream->queue, TRUE); - if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) { - stream->last_ret = GST_FLOW_FLUSHING; - stream->need_header = TRUE; - gst_task_stop (stream->download_task); - GST_TASK_SIGNAL (stream->download_task); - gst_uri_downloader_cancel (stream->downloader); - - gst_task_join (stream->download_task); - } + gst_task_join (stream->download_task); } +} - if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { - GST_TASK_SIGNAL (demux->stream_task); - gst_task_stop (demux->stream_task); - g_rec_mutex_lock (&demux->stream_task_lock); - g_rec_mutex_unlock (&demux->stream_task_lock); - gst_task_join (demux->stream_task); - } +static void +gst_dash_demux_stop (GstDashDemux * demux) +{ + GSList *iter; + + GST_DEBUG_OBJECT (demux, "Stopping demux"); + demux->cancelled = TRUE; + + if (demux->downloader) + gst_uri_downloader_cancel (demux->downloader); for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - gst_data_queue_flush (stream->queue); - stream->has_data_queued = FALSE; + stream->last_ret = GST_FLOW_FLUSHING; + stream->need_header = TRUE; + gst_task_stop (stream->download_task); + GST_TASK_SIGNAL (stream->download_task); + gst_uri_downloader_cancel (stream->downloader); } } @@ -1114,8 +1037,9 @@ gst_dash_demux_expose_streams (GstDashDemux * demux) for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - GST_LOG_OBJECT (demux, "Exposing stream %d %" GST_PTR_FORMAT, stream->index, - stream->input_caps); + GST_LOG_OBJECT (stream->pad, "Exposing stream %d %" GST_PTR_FORMAT, + stream->index, stream->input_caps); + gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad)); } gst_element_no_more_pads (GST_ELEMENT_CAST (demux)); @@ -1130,8 +1054,8 @@ gst_dash_demux_remove_streams (GstDashDemux * demux, GSList * streams) for (iter = streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data;; - GST_LOG_OBJECT (demux, "Removing stream %d %" GST_PTR_FORMAT, stream->index, - stream->input_caps); + GST_LOG_OBJECT (stream->pad, "Removing stream %d %" GST_PTR_FORMAT, + stream->index, stream->input_caps); gst_pad_push_event (stream->pad, gst_event_ref (eos)); gst_pad_set_active (stream->pad, FALSE); gst_element_remove_pad (GST_ELEMENT (demux), stream->pad); @@ -1145,7 +1069,8 @@ static gboolean gst_dash_demux_advance_period (GstDashDemux * demux) { GSList *old_period = NULL; - GST_DASH_DEMUX_CLIENT_LOCK (demux); + GSList *iter; + GstEvent *seg_evt; GST_DEBUG_OBJECT (demux, "Advancing period from %p", demux->streams); @@ -1167,256 +1092,66 @@ gst_dash_demux_advance_period (GstDashDemux * demux) return FALSE; } - gst_dash_demux_expose_streams (demux); - gst_dash_demux_remove_streams (demux, old_period); - GST_DASH_DEMUX_CLIENT_UNLOCK (demux); - return TRUE; -} - -static GstFlowReturn -gst_dash_demux_combine_flows (GstDashDemux * demux) -{ - gboolean all_notlinked = TRUE; - GSList *iter; + /* TODO protect with lock, using the client lock isn't useful + * because causes deadlocks with the event_src handler */ + gst_dash_demux_expose_streams (demux); + seg_evt = gst_event_new_segment (&demux->segment); for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - if (stream->last_ret != GST_FLOW_NOT_LINKED) - all_notlinked = FALSE; - - if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED - || stream->last_ret == GST_FLOW_FLUSHING) - return stream->last_ret; + gst_event_replace (&stream->pending_segment, seg_evt); } - if (all_notlinked) - return GST_FLOW_NOT_LINKED; - return GST_FLOW_OK; -} + gst_event_unref (seg_evt); + gst_dash_demux_remove_streams (demux, old_period); + GST_DASH_DEMUX_CLIENT_LOCK (demux); + return TRUE; +} -/* gst_dash_demux_stream_loop: - * - * Loop for the "stream' task that pushes fragments to the src pads. - * - * Startup: - * The task is started as soon as we have received the manifest and - * waits for the first fragment to be downloaded and pushed in the - * queue. Once this fragment has been pushed, the task pauses itself - * until actual playback begins. - * - * During playback: - * The task pushes fragments downstream at regular intervals based on - * the fragment duration. If it detects a queue underrun, it sends - * a buffering event to tell the main application to pause. - * - * Teardown: - * The task is stopped when we have reached the end of the manifest - * and emptied our queue. - * - */ -static void -gst_dash_demux_stream_loop (GstDashDemux * demux) +static GstFlowReturn +gst_dash_demux_push (GstDashDemuxStream * stream, GstBuffer * buffer) { - GstFlowReturn ret; - GSList *iter; - GstClockTime best_time; - GstDashDemuxStream *selected_stream; - gboolean eos = TRUE; - gboolean eop = TRUE; - - GST_LOG_OBJECT (demux, "Starting stream loop"); - - best_time = GST_CLOCK_TIME_NONE; - selected_stream = NULL; - - for (iter = demux->streams; iter; iter = g_slist_next (iter)) { - GstDashDemuxStream *stream = iter->data; - GstDataQueueItem *item; - - GST_DEBUG_OBJECT (demux, "Peeking stream %d", stream->index); - - if (stream->last_ret == GST_FLOW_NOT_LINKED) - continue; - - if (stream->stream_eos) { - GST_DEBUG_OBJECT (demux, "Stream %d is eos, skipping", stream->index); - continue; - } - - if (stream->stream_end_of_period) { - GST_DEBUG_OBJECT (demux, "Stream %d is eop, skipping", stream->index); - eos = FALSE; - continue; - } - eos = FALSE; - eop = FALSE; - - GST_DEBUG_OBJECT (demux, "peeking at the queue for stream %d", - stream->index); - if (!gst_data_queue_peek (stream->queue, &item)) { - /* flushing */ - goto flushing; - } - - if (G_LIKELY (GST_IS_BUFFER (item->object))) { - GstBuffer *buffer; - GstClockTime timestamp; - - buffer = GST_BUFFER_CAST (item->object); - timestamp = GST_BUFFER_TIMESTAMP (buffer); - - GST_LOG_OBJECT (demux, "Buffer with time %" GST_TIME_FORMAT, - GST_TIME_ARGS (timestamp)); - - if (timestamp < best_time) { - GST_DEBUG_OBJECT (demux, "Found new best time: %" GST_TIME_FORMAT " %p", - GST_TIME_ARGS (timestamp), buffer); - best_time = timestamp; - selected_stream = stream; - } else if (!GST_CLOCK_TIME_IS_VALID (timestamp)) { - selected_stream = stream; - GST_DEBUG_OBJECT (demux, "Buffer without timestamp selected %p", - buffer); - break; - } - } else { - GST_DEBUG_OBJECT (demux, "Non buffers have preference %" GST_PTR_FORMAT, - item->object); - selected_stream = stream; - break; - } - } - - ret = GST_FLOW_OK; - if (selected_stream) { - GstDataQueueItem *item; - - GST_DEBUG_OBJECT (demux, "Selected stream %p %d", selected_stream, - selected_stream->index); - - if (!gst_data_queue_pop (selected_stream->queue, &item)) - goto end; - - if (G_LIKELY (GST_IS_BUFFER (item->object))) { - GstBuffer *buffer; - GstClockTime timestamp, duration; - - buffer = GST_BUFFER_CAST (item->object); - timestamp = GST_BUFFER_TIMESTAMP (buffer); - - if (demux->need_segment) { - if (demux->timestamp_offset == -1) - demux->timestamp_offset = timestamp; + GstDashDemux *demux = stream->demux; + GstClockTime timestamp, duration; + GstFlowReturn ret = GST_FLOW_OK; - /* And send a newsegment */ - for (iter = demux->streams; iter; iter = g_slist_next (iter)) { - GstDashDemuxStream *stream = iter->data; - gst_pad_push_event (stream->pad, - gst_event_new_segment (&demux->segment)); - } - demux->need_segment = FALSE; - } - /* make timestamp start from 0 by subtracting the offset */ - timestamp -= demux->timestamp_offset; - duration = GST_BUFFER_DURATION (buffer); + timestamp = GST_BUFFER_TIMESTAMP (buffer); - GST_BUFFER_TIMESTAMP (buffer) = timestamp; + if (stream->pending_segment) { + if (demux->timestamp_offset == -1) + demux->timestamp_offset = timestamp; + else + demux->timestamp_offset = MIN (timestamp, demux->timestamp_offset); - GST_DEBUG_OBJECT (demux, - "Pushing fragment ts: %" GST_TIME_FORMAT " at pad %s", - GST_TIME_ARGS (timestamp), GST_PAD_NAME (selected_stream->pad)); - GST_DEBUG_OBJECT (demux, - "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%" - GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT " at pad: %s:%s", buffer, - GST_BUFFER_OFFSET (buffer), selected_stream->index, - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)), - GST_DEBUG_PAD_NAME (selected_stream->pad)); - ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer)); - GST_DEBUG_OBJECT (demux, "Push result: %d %s", ret, - gst_flow_get_name (ret)); - - demux->segment.position = timestamp; - selected_stream->position = timestamp; - if (GST_CLOCK_TIME_IS_VALID (duration)) - selected_stream->position += duration; - - item->destroy (item); - } else { - /* a GstEvent */ - if (GST_EVENT_TYPE (item->object) == GST_EVENT_EOS) { - selected_stream->stream_end_of_period = TRUE; - selected_stream->stream_eos = TRUE; - ret = GST_FLOW_EOS; - } else if (GST_EVENT_TYPE (item->object) == GST_EVENT_DASH_EOP) { - selected_stream->stream_end_of_period = TRUE; - } - - if (GST_EVENT_TYPE (item->object) != GST_EVENT_DASH_EOP) { - gst_pad_push_event (selected_stream->pad, - gst_event_ref (GST_EVENT_CAST (item->object))); - } - item->destroy (item); - } - } else { - if (eos) { - goto end_of_manifest; - } else if (eop) { - gst_dash_demux_advance_period (demux); - demux->need_segment = TRUE; - } + /* And send a newsegment */ + gst_pad_push_event (stream->pad, stream->pending_segment); + stream->pending_segment = NULL; } - if (selected_stream) { - GST_DASH_DEMUX_CLIENT_LOCK (demux); - if (ret != selected_stream->last_ret) { - gst_task_start (selected_stream->download_task); - selected_stream->last_ret = ret; - } - switch (selected_stream->last_ret) { - case GST_FLOW_NOT_LINKED: - gst_data_queue_set_flushing (selected_stream->queue, TRUE); - break; - default: - break; - } - - /* combine flow returns */ - ret = gst_dash_demux_combine_flows (demux); - if (ret < 0) { - goto error; - } - GST_DASH_DEMUX_CLIENT_UNLOCK (demux); - } + /* make timestamp start from 0 by subtracting the offset */ + timestamp -= demux->timestamp_offset; + duration = GST_BUFFER_DURATION (buffer); -end: - GST_INFO_OBJECT (demux, "Leaving streaming task"); - return; + GST_BUFFER_TIMESTAMP (buffer) = timestamp; -flushing: - { - GST_WARNING_OBJECT (demux, "Flushing, leaving streaming task"); - gst_task_stop (demux->stream_task); - return; - } + GST_DEBUG_OBJECT (stream->pad, + "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%" + GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT, buffer, + GST_BUFFER_OFFSET (buffer), stream->index, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); + ret = gst_pad_push (stream->pad, gst_buffer_ref (buffer)); + GST_DEBUG_OBJECT (stream->pad, "Push result: %d %s", ret, + gst_flow_get_name (ret)); -end_of_manifest: - { - GST_INFO_OBJECT (demux, "Reached end of manifest, stopping streaming task"); - gst_task_stop (demux->stream_task); - return; - } + demux->segment.position = timestamp; + stream->position = timestamp; + if (GST_CLOCK_TIME_IS_VALID (duration)) + stream->position += duration; -error: - { - GST_ERROR_OBJECT (demux, - "Error pushing buffer: %s... terminating the demux", - gst_flow_get_name (ret)); - gst_task_stop (demux->stream_task); - return; - } + return ret; } static void @@ -1427,17 +1162,15 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream) gst_caps_unref (stream->input_caps); stream->input_caps = NULL; } + if (stream->pending_segment) { + gst_event_unref (stream->pending_segment); + } if (stream->pad) { gst_object_unref (stream->pad); stream->pad = NULL; } - if (stream->queue) { - g_object_unref (stream->queue); - stream->queue = NULL; - } if (stream->download_task) { gst_task_stop (stream->download_task); - gst_task_join (stream->download_task); gst_object_unref (stream->download_task); g_rec_mutex_clear (&stream->download_task_lock); } @@ -1461,8 +1194,8 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose) demux->end_of_period = FALSE; demux->end_of_manifest = FALSE; - demux->cancelled = TRUE; gst_dash_demux_stop (demux); + gst_dash_demux_wait_stop (demux); if (demux->downloader) gst_uri_downloader_reset (demux->downloader); @@ -1508,36 +1241,6 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose) demux->cancelled = FALSE; } -#ifndef GST_DISABLE_GST_DEBUG -static GstClockTime -gst_dash_demux_get_buffering_time (GstDashDemux * demux) -{ - GstClockTime buffer_time = GST_CLOCK_TIME_NONE; - GSList *iter; - - for (iter = demux->streams; iter; iter = g_slist_next (iter)) { - GstClockTime btime = gst_dash_demux_stream_get_buffering_time (iter->data); - - if (!GST_CLOCK_TIME_IS_VALID (buffer_time) || buffer_time > btime) - buffer_time = btime; - } - - if (!GST_CLOCK_TIME_IS_VALID (buffer_time)) - buffer_time = 0; - return buffer_time; -} - -static GstClockTime -gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream) -{ - GstDataQueueSize level; - - gst_data_queue_get_level (stream->queue, &level); - - return (GstClockTime) level.time; -} -#endif - static GstFlowReturn gst_dash_demux_refresh_mpd (GstDashDemux * demux) { @@ -1687,23 +1390,37 @@ gst_dash_demux_refresh_mpd (GstDashDemux * demux) return GST_FLOW_OK; } +static GstFlowReturn +gst_dash_demux_combine_flows (GstDashDemux * demux) +{ + gboolean all_notlinked = TRUE; + GSList *iter; + + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { + GstDashDemuxStream *stream = iter->data; + + if (stream->last_ret != GST_FLOW_NOT_LINKED) + all_notlinked = FALSE; + + if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED + || stream->last_ret == GST_FLOW_FLUSHING) + return stream->last_ret; + } + if (all_notlinked) + return GST_FLOW_NOT_LINKED; + return GST_FLOW_OK; +} + + /* gst_dash_demux_stream_download_loop: * * Loop for the "download' task that fetches fragments based on the * selected representations. * - * Startup: - * - * The task is started from the stream loop. - * * During playback: * * It sequentially fetches fragments corresponding to the current - * representations and pushes them into a queue. - * - * It tries to maintain the number of queued items within a predefined - * range: if the queue is full, it will pause, checking every 100 ms if - * it needs to restart downloading fragments. + * representations and pushes them downstream * * When a new set of fragments has been downloaded, it evaluates the * download time to check if we can or should switch to a different @@ -1713,36 +1430,22 @@ gst_dash_demux_refresh_mpd (GstDashDemux * demux) * * The task will exit when it encounters an error or when the end of the * manifest has been reached. - * */ static void gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) { - GstClockTime fragment_ts = GST_CLOCK_TIME_NONE; GstDashDemux *demux = stream->demux; GstFlowReturn flow_ret = GST_FLOW_OK; + GstClockTime fragment_ts = GST_CLOCK_TIME_NONE; + GstEvent *caps_event; GST_LOG_OBJECT (stream->pad, "Starting download loop"); GST_DASH_DEMUX_CLIENT_LOCK (demux); - if (stream->last_ret < GST_FLOW_OK) { - if (demux->cancelled) { - GST_DASH_DEMUX_CLIENT_UNLOCK (demux); - goto cancelled; - } - GST_DEBUG_OBJECT (stream->pad, "Download loop waiting due to flow return: " - "%d %s", stream->last_ret, gst_flow_get_name (stream->last_ret)); - gst_task_pause (stream->download_task); - GST_DASH_DEMUX_CLIENT_UNLOCK (demux); - return; - } - GST_DASH_DEMUX_CLIENT_UNLOCK (demux); - if (demux->cancelled) { goto cancelled; } - GST_DASH_DEMUX_CLIENT_LOCK (demux); if (gst_mpd_client_is_live (demux->client) && demux->client->mpd_uri != NULL) { switch (gst_dash_demux_refresh_mpd (demux)) { @@ -1754,43 +1457,69 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) } } - GST_DEBUG_OBJECT (stream->pad, "End of manifest: %d", demux->end_of_manifest); - /* try to switch to another set of representations if needed */ - gst_dash_demux_stream_select_representation_unlocked (stream); + caps_event = gst_dash_demux_stream_select_representation_unlocked (stream); GST_DASH_DEMUX_CLIENT_UNLOCK (demux); + if (G_UNLIKELY (caps_event && !gst_pad_push_event (stream->pad, caps_event))) { + /* TODO fail if caps is rejected */ + } + /* fetch the next fragment */ - flow_ret = gst_dash_demux_get_next_fragment (demux, stream, &fragment_ts); + flow_ret = gst_dash_demux_stream_get_next_fragment (stream, &fragment_ts); + GST_DASH_DEMUX_CLIENT_LOCK (demux); + if (demux->cancelled) { + goto cancelled; + } + + stream->last_ret = flow_ret; switch (flow_ret) { case GST_FLOW_OK: break; + case GST_FLOW_NOT_LINKED: + gst_task_pause (stream->download_task); + if (gst_dash_demux_combine_flows (demux) == GST_FLOW_NOT_LINKED) { + GST_ELEMENT_ERROR (demux, STREAM, FAILED, + (_("Internal data stream error.")), + ("stream stopped, reason %s", + gst_flow_get_name (GST_FLOW_NOT_LINKED))); + } + break; + + case GST_FLOW_FLUSHING: + gst_dash_demux_stop (demux); + break; + case GST_FLOW_EOS: - GST_DASH_DEMUX_CLIENT_LOCK (demux); + GST_DEBUG_OBJECT (stream->pad, "EOS"); + stream->stream_eos = TRUE; if (gst_dash_demux_all_streams_eop (demux)) { GST_INFO_OBJECT (stream->pad, "Reached the end of the Period"); if (gst_mpd_client_has_next_period (demux->client)) { + + GST_INFO_OBJECT (demux, "Starting next period"); /* setup video, audio and subtitle streams, starting from the next Period */ if (!gst_mpd_client_set_period_index (demux->client, gst_mpd_client_get_period_index (demux->client) + 1) || !gst_dash_demux_setup_all_streams (demux)) { } + + stream->last_ret = GST_FLOW_OK; /* start playing from the first segment of the new period */ gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0); - demux->end_of_period = FALSE; - gst_task_start (demux->stream_task); - } else if (!demux->end_of_manifest) { - GST_INFO_OBJECT (stream->pad, "Reached the end of the manifest file"); - demux->end_of_manifest = TRUE; - gst_task_start (demux->stream_task); + + gst_dash_demux_advance_period (demux); + + gst_dash_demux_resume_download_task (demux); GST_DASH_DEMUX_CLIENT_UNLOCK (demux); - goto end_of_manifest; + + /* This pad is now finished, we lost its reference */ + return; } } gst_task_pause (stream->download_task); - GST_DASH_DEMUX_CLIENT_UNLOCK (demux); break; case GST_FLOW_ERROR: /* Download failed 'by itself' @@ -1847,21 +1576,23 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) break; } - if (demux->cancelled) { - goto cancelled; - } - - GST_INFO_OBJECT (stream->pad, "Internal buffering : %" G_GUINT64_FORMAT " s", - gst_dash_demux_get_buffering_time (demux) / GST_SECOND); - demux->client->update_failed_count = 0; + if (G_LIKELY (stream->last_ret != GST_FLOW_ERROR)) + demux->client->update_failed_count = 0; quit: + GST_DASH_DEMUX_CLIENT_UNLOCK (demux); + + if (G_UNLIKELY (stream->last_ret == GST_FLOW_EOS)) { + gst_pad_push_event (stream->pad, gst_event_new_eos ()); + } + GST_DEBUG_OBJECT (stream->pad, "Finishing download loop"); return; cancelled: { GST_WARNING_OBJECT (stream->pad, "Cancelled, leaving download task"); + GST_DASH_DEMUX_CLIENT_UNLOCK (demux); return; } @@ -1869,6 +1600,8 @@ end_of_manifest: { GST_INFO_OBJECT (stream->pad, "End of manifest, leaving download task"); gst_task_stop (stream->download_task); + /* TODO should push eos? */ + GST_DASH_DEMUX_CLIENT_UNLOCK (demux); return; } @@ -1877,17 +1610,12 @@ error_downloading: GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND, ("Could not fetch the next fragment, leaving download task"), (NULL)); gst_task_stop (stream->download_task); + GST_DASH_DEMUX_CLIENT_UNLOCK (demux); return; } } static void -gst_dash_demux_resume_stream_task (GstDashDemux * demux) -{ - gst_task_start (demux->stream_task); -} - -static void gst_dash_demux_resume_download_task (GstDashDemux * demux) { GSList *iter; @@ -1904,7 +1632,7 @@ gst_dash_demux_resume_download_task (GstDashDemux * demux) * Select the most appropriate media representation based on current target * bitrate. */ -static gboolean +static GstEvent * gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream * stream) { @@ -1943,20 +1671,17 @@ gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream * stream->index, new_index, rep->bandwidth); if (gst_mpd_client_setup_representation (demux->client, active_stream, rep)) { stream->need_header = TRUE; - stream->has_data_queued = FALSE; GST_INFO_OBJECT (demux, "Switching bitrate to %d", active_stream->cur_representation->bandwidth); gst_caps_unref (stream->input_caps); stream->input_caps = gst_dash_demux_get_input_caps (demux, active_stream); - gst_dash_demux_stream_push_event (stream, - gst_event_new_caps (stream->input_caps)); - return TRUE; + return gst_event_new_caps (gst_caps_ref (stream->input_caps)); } else { GST_WARNING_OBJECT (demux, "Can not switch representation, aborting..."); - return FALSE; + return NULL; } } - return FALSE; + return NULL; } static GstBuffer * @@ -2151,8 +1876,8 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux, } } -static gboolean -gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, +static GstBuffer * +gst_dash_demux_stream_download_fragment (GstDashDemux * demux, GstDashDemuxStream * stream, guint64 * size_buffer, GstClockTime * download_time) { @@ -2161,7 +1886,7 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, GTimeVal now; GTimeVal start; guint stream_idx = stream->index; - GstBuffer *buffer; + GstBuffer *buffer = NULL; GstBuffer *header_buffer; GstMediaFragmentInfo fragment; @@ -2215,17 +1940,12 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, */ stream->last_ret = GST_FLOW_CUSTOM_SUCCESS; - gst_data_queue_set_flushing (stream->queue, FALSE); stream->restart_download = FALSE; - gst_task_start (demux->stream_task); } if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, &fragment)) { - gboolean catch_up = FALSE; - g_get_current_time (&start); - GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx); - GST_INFO_OBJECT (demux, + GST_INFO_OBJECT (stream->pad, "Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT " Range:%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT, fragment.uri, GST_TIME_ARGS (fragment.timestamp), @@ -2237,14 +1957,14 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, if (download == NULL) { gst_media_fragment_info_clear (&fragment); - return FALSE; + return NULL; } active_stream = stream->active_stream; if (active_stream == NULL) { gst_media_fragment_info_clear (&fragment); g_object_unref (download); - return FALSE; + return NULL; } buffer = gst_fragment_get_buffer (download); @@ -2259,7 +1979,7 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, if (!uri) /* fallback to default media uri */ uri = fragment.uri; - GST_DEBUG_OBJECT (demux, + GST_DEBUG_OBJECT (stream->pad, "Fragment index download: %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT, uri, fragment.index_range_start, fragment.index_range_end); @@ -2293,41 +2013,15 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, gst_mpd_client_get_segment_index (active_stream) - 1; gst_media_fragment_info_clear (&fragment); - - /* Check if this stream is on catch up mode */ - if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) { - GST_DEBUG_OBJECT (stream->pad, - "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT, - GST_TIME_ARGS (demux->segment.position), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); - if (GST_BUFFER_TIMESTAMP (buffer) < demux->segment.position) { - catch_up = TRUE; - } else { - stream->last_ret = GST_FLOW_OK; - gst_task_start (demux->stream_task); - } - } - *size_buffer += gst_buffer_get_size (buffer); - if (catch_up) { - GstFlowReturn ret; - - ret = gst_pad_push (stream->pad, buffer); - if (G_LIKELY (ret == GST_FLOW_OK)) - stream->last_ret = GST_FLOW_CUSTOM_SUCCESS; - /* TODO handle return */ - } else { - gst_dash_demux_stream_push_data (stream, buffer); - stream->has_data_queued = TRUE; - } } else { GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d", stream, stream->index); } - return TRUE; + return buffer; } -/* gst_dash_demux_get_next_fragment: +/* gst_dash_demux_stream_get_next_fragment: * * Get the next fragments for the stream with the earlier timestamp. * It returns the selected timestamp so the caller can deal with @@ -2336,18 +2030,19 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, * This function uses the generic URI downloader API. * * Returns FALSE if an error occured while downloading fragments - * */ static GstFlowReturn -gst_dash_demux_get_next_fragment (GstDashDemux * demux, - GstDashDemuxStream * stream, GstClockTime * selected_ts) +gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream, + GstClockTime * ts) { guint64 buffer_size = 0; GstClockTime diff; gboolean end_of_period = TRUE; - GstClockTime ts; + GstBuffer *buffer = NULL; + GstFlowReturn ret = GST_FLOW_OK; + GstDashDemux *demux = stream->demux; - if (stream->download_end_of_period) + if (stream->stream_eos) return GST_FLOW_EOS; if (stream->last_ret == GST_FLOW_NOT_LINKED) { @@ -2356,11 +2051,8 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux, return GST_FLOW_NOT_LINKED; } - if (gst_mpd_client_get_next_fragment_timestamp (demux->client, - stream->index, &ts)) { - } else { - GstEvent *event = NULL; - + if (!gst_mpd_client_get_next_fragment_timestamp (demux->client, + stream->index, ts)) { GST_INFO_OBJECT (demux, "This Period doesn't contain more fragments for stream %u", stream->index); @@ -2371,36 +2063,34 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux, end_of_period = FALSE; return GST_FLOW_OK; /* TODO wait */ } - - if (gst_mpd_client_has_next_period (demux->client)) { - event = gst_event_new_dash_eop (); - } else { - GST_DEBUG_OBJECT (demux, - "No more fragments or periods for this stream, setting EOS"); - event = gst_event_new_eos (); - } - stream->download_end_of_period = TRUE; - gst_dash_demux_stream_push_event (stream, event); + return GST_FLOW_EOS; } /* * If this is a live stream, check the segment end time to make sure * it is available to download */ - if (stream && gst_mpd_client_is_live (demux->client) && + if (gst_mpd_client_is_live (demux->client) && demux->client->mpd_node->minimumUpdatePeriod != -1) { gst_dash_demux_wait_for_fragment_to_be_available (demux, stream); } /* Get the fragment corresponding to each stream index */ - if (stream) { - gst_dash_demux_get_next_fragment_for_stream (demux, stream, &buffer_size, - &diff); - end_of_period = FALSE; - } + buffer = + gst_dash_demux_stream_download_fragment (demux, stream, + &buffer_size, &diff); + end_of_period = FALSE; demux->end_of_period = end_of_period; + + if (buffer) { + ret = gst_dash_demux_push (stream, buffer); + } else { + GST_WARNING_OBJECT (stream->pad, "Failed to download fragment"); + return GST_FLOW_ERROR; + } + if (end_of_period) return GST_FLOW_EOS; @@ -2419,7 +2109,7 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux, G_GUINT64_FORMAT " Ko in %.2f s)", stream->index, brate / 1000, buffer_size / 1024, ((double) diff / GST_SECOND)); } - return GST_FLOW_OK; + return ret; } static void diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index 935eec1..935e7e9 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -68,38 +68,11 @@ struct _GstDashDemuxStream GstClockTime position; gboolean restart_download; - /* - * Need to store the status for the download and - * stream tasks separately as they are working at - * different points of the stream timeline. - * The download task is ahead of the stream. - * - * The download_end_of_period is set when a stream - * has already downloaded all fragments for the current - * period. - * - * The stream_end_of_period is set when a stream - * has pushed all fragments for the current period - */ - gboolean download_end_of_period; - gboolean stream_end_of_period; + GstEvent *pending_segment; gboolean stream_eos; gboolean need_header; - /* tracks if a stream has enqueued data - * after a pad switch. - * This is required to prevent pads being - * added to the demuxer and having no data - * pushed to it before another pad switch - * as this might make downstream elements - * unhappy and error out if they get - * an EOS without receiving any input - */ - gboolean has_data_queued; - - GstDataQueue *queue; - /* Download task */ GMutex download_mutex; GCond download_cond; @@ -127,7 +100,6 @@ struct _GstDashDemux GSList *next_periods; GstSegment segment; - gboolean need_segment; GstClockTime timestamp_offset; GstBuffer *manifest; @@ -143,10 +115,6 @@ struct _GstDashDemux gfloat bandwidth_usage; /* Percentage of the available bandwidth to use */ guint64 max_bitrate; /* max of bitrate supported by target decoder */ - /* Streaming task */ - GstTask *stream_task; - GRecMutex stream_task_lock; - gboolean cancelled; /* Manifest update */