From 585e60c4abd0ea4e0a12700b84580e5b8a7cdc53 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Wed, 13 Jul 2016 23:02:10 +1000 Subject: [PATCH] adaptivedemux: Add more safeguards around state changes. Make state changes of internal elements more reliable by locking their state, and ensuring that they aren't blocked pushing data downstream before trying to set their state. Add a boolean to avoid starting tasks when the main thread is busy trying to shut the element down. --- gst-libs/gst/adaptivedemux/gstadaptivedemux.c | 70 +++++++++++++++++++++------ gst-libs/gst/adaptivedemux/gstadaptivedemux.h | 3 ++ 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c index 2d2e263..2f07510 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c @@ -531,9 +531,15 @@ gst_adaptive_demux_change_state (GstElement * element, switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: GST_MANIFEST_LOCK (demux); + demux->running = FALSE; gst_adaptive_demux_reset (demux); GST_MANIFEST_UNLOCK (demux); break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + GST_MANIFEST_LOCK (demux); + demux->running = TRUE; + GST_MANIFEST_UNLOCK (demux); + break; default: break; } @@ -944,20 +950,10 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux, demux->streams = demux->next_streams; demux->next_streams = NULL; - /* First ensure all on-going downloads are finished or cancelled */ - GST_MANIFEST_UNLOCK (demux); - for (iter = old_streams; iter; iter = g_list_next (iter)) { - GstAdaptiveDemuxStream *stream = iter->data; - g_mutex_lock (&stream->fragment_download_lock); - while (!stream->cancelled && !stream->download_finished) { - GST_DEBUG_OBJECT (stream->pad, - "Waiting for download on active stream to finish"); - g_cond_wait (&stream->fragment_download_cond, - &stream->fragment_download_lock); - } - g_mutex_unlock (&stream->fragment_download_lock); + if (!demux->running) { + GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown"); + return TRUE; } - GST_MANIFEST_LOCK (demux); for (iter = demux->streams; iter; iter = g_list_next (iter)) { GstAdaptiveDemuxStream *stream = iter->data; @@ -1237,6 +1233,7 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream) if (stream->src) { GST_MANIFEST_UNLOCK (demux); + gst_element_set_locked_state (stream->src, TRUE); gst_element_set_state (stream->src, GST_STATE_NULL); gst_bin_remove (GST_BIN_CAST (demux), stream->src); stream->src = NULL; @@ -1528,7 +1525,8 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent, stream = gst_adaptive_demux_find_stream_for_pad (demux, pad); if (stream) { - if (stream->last_ret == GST_FLOW_NOT_LINKED) { + if (!stream->cancelled && demux->running && + stream->last_ret == GST_FLOW_NOT_LINKED) { stream->last_ret = GST_FLOW_OK; stream->restart_download = TRUE; stream->need_header = TRUE; @@ -1675,6 +1673,11 @@ gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux) { GList *iter; + if (!demux->running) { + GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown"); + return; + } + GST_INFO_OBJECT (demux, "Starting streams' tasks"); for (iter = demux->streams; iter; iter = g_list_next (iter)) { GstAdaptiveDemuxStream *stream = iter->data; @@ -1736,6 +1739,7 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux) GST_MANIFEST_UNLOCK (demux); if (src) { + gst_element_set_locked_state (stream->src, TRUE); gst_element_set_state (src, GST_STATE_READY); } @@ -2387,6 +2391,7 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, if (!g_str_equal (old_protocol, new_protocol)) { gst_object_unref (stream->src_srcpad); + gst_element_set_locked_state (stream->src, TRUE); gst_element_set_state (stream->src, GST_STATE_NULL); gst_bin_remove (GST_BIN_CAST (demux), stream->src); stream->src = NULL; @@ -2402,6 +2407,7 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, err->message); g_clear_error (&err); gst_object_unref (stream->src_srcpad); + gst_element_set_locked_state (stream->src, TRUE); gst_element_set_state (stream->src, GST_STATE_NULL); gst_bin_remove (GST_BIN_CAST (demux), stream->src); stream->src = NULL; @@ -2542,6 +2548,22 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, return TRUE; } + +static GstPadProbeReturn +gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info, + gpointer user_data) +{ + GstAdaptiveDemuxStream *stream = user_data; + + /* The source's src pad is IDLE so now set the state to READY */ + g_mutex_lock (&stream->fragment_download_lock); + stream->src_at_ready = TRUE; + g_cond_signal (&stream->fragment_download_cond); + g_mutex_unlock (&stream->fragment_download_lock); + + return GST_PAD_PROBE_OK; +} + /* must be called with manifest_lock taken. * Can temporarily release manifest_lock */ @@ -2559,6 +2581,8 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux, return ret; } + gst_element_set_locked_state (stream->src, TRUE); + if (gst_element_set_state (stream->src, GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) { if (start != 0 || end != -1) { @@ -2608,6 +2632,7 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux, "Waiting for fragment download to finish: %s", uri); g_mutex_lock (&stream->fragment_download_lock); + stream->src_at_ready = FALSE; if (G_UNLIKELY (stream->cancelled)) { g_mutex_unlock (&stream->fragment_download_lock); GST_MANIFEST_LOCK (demux); @@ -2645,10 +2670,22 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux, */ GST_MANIFEST_UNLOCK (demux); - /* FIXME: Wait until the src pad is IDLE, as it might be blocked - * downstream indefinitely here */ + stream->src_at_ready = FALSE; + + gst_element_set_locked_state (stream->src, TRUE); + gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE, + gst_ad_stream_src_to_ready_cb, stream, NULL); + + g_mutex_lock (&stream->fragment_download_lock); + while (!stream->src_at_ready) { + g_cond_wait (&stream->fragment_download_cond, + &stream->fragment_download_lock); + } + g_mutex_unlock (&stream->fragment_download_lock); + gst_element_set_state (stream->src, GST_STATE_READY); + /* Need to drop the fragment_download_lock to get the MANIFEST lock */ GST_MANIFEST_LOCK (demux); g_mutex_lock (&stream->fragment_download_lock); if (G_UNLIKELY (stream->cancelled)) { @@ -3135,6 +3172,7 @@ download_error: gst_task_stop (stream->download_task); if (stream->src) { + gst_element_set_locked_state (stream->src, TRUE); gst_element_set_state (stream->src, GST_STATE_NULL); gst_bin_remove (GST_BIN_CAST (demux), stream->src); stream->src = NULL; diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h index c75309f..cd8ed10 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h @@ -152,6 +152,7 @@ struct _GstAdaptiveDemuxStream GCond fragment_download_cond; gboolean download_finished; /* protected by fragment_download_lock */ gboolean cancelled; /* protected by fragment_download_lock */ + gboolean src_at_ready; /* protected by fragment_download_lock */ gboolean starting_fragment; gboolean first_fragment_buffer; gint64 download_start_time; @@ -183,6 +184,8 @@ struct _GstAdaptiveDemux /*< private >*/ GstBin bin; + gboolean running; + gsize stream_struct_size; /*< protected >*/ -- 2.7.4