adaptivedemux: Add more safeguards around state changes.
authorJan Schmidt <jan@centricular.com>
Wed, 13 Jul 2016 13:02:10 +0000 (23:02 +1000)
committerJan Schmidt <jan@centricular.com>
Fri, 15 Jul 2016 04:33:23 +0000 (14:33 +1000)
Make state changes of internal elements more reliable by locking
their state, and ensuring that they aren't blocked pushing data
downstream before trying to set their state.

Add a boolean to avoid starting tasks when the main
thread is busy trying to shut the element down.

gst-libs/gst/adaptivedemux/gstadaptivedemux.c
gst-libs/gst/adaptivedemux/gstadaptivedemux.h

index 2d2e26331ca9ac338a5ae92a8c6d222daf04a5d8..2f075101673e4c0de2cf3becf4a56ec94902ea5b 100644 (file)
@@ -531,9 +531,15 @@ gst_adaptive_demux_change_state (GstElement * element,
   switch (transition) {
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       GST_MANIFEST_LOCK (demux);
+      demux->running = FALSE;
       gst_adaptive_demux_reset (demux);
       GST_MANIFEST_UNLOCK (demux);
       break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      GST_MANIFEST_LOCK (demux);
+      demux->running = TRUE;
+      GST_MANIFEST_UNLOCK (demux);
+      break;
     default:
       break;
   }
@@ -944,20 +950,10 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
   demux->streams = demux->next_streams;
   demux->next_streams = NULL;
 
-  /* First ensure all on-going downloads are finished or cancelled */
-  GST_MANIFEST_UNLOCK (demux);
-  for (iter = old_streams; iter; iter = g_list_next (iter)) {
-    GstAdaptiveDemuxStream *stream = iter->data;
-    g_mutex_lock (&stream->fragment_download_lock);
-    while (!stream->cancelled && !stream->download_finished) {
-      GST_DEBUG_OBJECT (stream->pad,
-          "Waiting for download on active stream to finish");
-      g_cond_wait (&stream->fragment_download_cond,
-          &stream->fragment_download_lock);
-    }
-    g_mutex_unlock (&stream->fragment_download_lock);
+  if (!demux->running) {
+    GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
+    return TRUE;
   }
-  GST_MANIFEST_LOCK (demux);
 
   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
     GstAdaptiveDemuxStream *stream = iter->data;
@@ -1237,6 +1233,7 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
 
   if (stream->src) {
     GST_MANIFEST_UNLOCK (demux);
+    gst_element_set_locked_state (stream->src, TRUE);
     gst_element_set_state (stream->src, GST_STATE_NULL);
     gst_bin_remove (GST_BIN_CAST (demux), stream->src);
     stream->src = NULL;
@@ -1528,7 +1525,8 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
 
       if (stream) {
-        if (stream->last_ret == GST_FLOW_NOT_LINKED) {
+        if (!stream->cancelled && demux->running &&
+            stream->last_ret == GST_FLOW_NOT_LINKED) {
           stream->last_ret = GST_FLOW_OK;
           stream->restart_download = TRUE;
           stream->need_header = TRUE;
@@ -1675,6 +1673,11 @@ gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
 {
   GList *iter;
 
+  if (!demux->running) {
+    GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
+    return;
+  }
+
   GST_INFO_OBJECT (demux, "Starting streams' tasks");
   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
     GstAdaptiveDemuxStream *stream = iter->data;
@@ -1736,6 +1739,7 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
     GST_MANIFEST_UNLOCK (demux);
 
     if (src) {
+      gst_element_set_locked_state (stream->src, TRUE);
       gst_element_set_state (src, GST_STATE_READY);
     }
 
@@ -2387,6 +2391,7 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
 
     if (!g_str_equal (old_protocol, new_protocol)) {
       gst_object_unref (stream->src_srcpad);
+      gst_element_set_locked_state (stream->src, TRUE);
       gst_element_set_state (stream->src, GST_STATE_NULL);
       gst_bin_remove (GST_BIN_CAST (demux), stream->src);
       stream->src = NULL;
@@ -2402,6 +2407,7 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
             err->message);
         g_clear_error (&err);
         gst_object_unref (stream->src_srcpad);
+        gst_element_set_locked_state (stream->src, TRUE);
         gst_element_set_state (stream->src, GST_STATE_NULL);
         gst_bin_remove (GST_BIN_CAST (demux), stream->src);
         stream->src = NULL;
@@ -2542,6 +2548,22 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
   return TRUE;
 }
 
+
+static GstPadProbeReturn
+gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
+    gpointer user_data)
+{
+  GstAdaptiveDemuxStream *stream = user_data;
+
+  /* The source's src pad is IDLE so now set the state to READY */
+  g_mutex_lock (&stream->fragment_download_lock);
+  stream->src_at_ready = TRUE;
+  g_cond_signal (&stream->fragment_download_cond);
+  g_mutex_unlock (&stream->fragment_download_lock);
+
+  return GST_PAD_PROBE_OK;
+}
+
 /* must be called with manifest_lock taken.
  * Can temporarily release manifest_lock
  */
@@ -2559,6 +2581,8 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
     return ret;
   }
 
+  gst_element_set_locked_state (stream->src, TRUE);
+
   if (gst_element_set_state (stream->src,
           GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
     if (start != 0 || end != -1) {
@@ -2608,6 +2632,7 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
           "Waiting for fragment download to finish: %s", uri);
 
       g_mutex_lock (&stream->fragment_download_lock);
+      stream->src_at_ready = FALSE;
       if (G_UNLIKELY (stream->cancelled)) {
         g_mutex_unlock (&stream->fragment_download_lock);
         GST_MANIFEST_LOCK (demux);
@@ -2645,10 +2670,22 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
    */
   GST_MANIFEST_UNLOCK (demux);
 
-  /* FIXME: Wait until the src pad is IDLE, as it might be blocked
-   * downstream indefinitely here */
+  stream->src_at_ready = FALSE;
+
+  gst_element_set_locked_state (stream->src, TRUE);
+  gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
+      gst_ad_stream_src_to_ready_cb, stream, NULL);
+
+  g_mutex_lock (&stream->fragment_download_lock);
+  while (!stream->src_at_ready) {
+    g_cond_wait (&stream->fragment_download_cond,
+        &stream->fragment_download_lock);
+  }
+  g_mutex_unlock (&stream->fragment_download_lock);
+
   gst_element_set_state (stream->src, GST_STATE_READY);
 
+  /* Need to drop the fragment_download_lock to get the MANIFEST lock */
   GST_MANIFEST_LOCK (demux);
   g_mutex_lock (&stream->fragment_download_lock);
   if (G_UNLIKELY (stream->cancelled)) {
@@ -3135,6 +3172,7 @@ download_error:
 
     gst_task_stop (stream->download_task);
     if (stream->src) {
+      gst_element_set_locked_state (stream->src, TRUE);
       gst_element_set_state (stream->src, GST_STATE_NULL);
       gst_bin_remove (GST_BIN_CAST (demux), stream->src);
       stream->src = NULL;
index c75309f6773e704bc44ac8266268a9e41bba69ff..cd8ed107dbd2b2d2467a4495a5407037225b340a 100644 (file)
@@ -152,6 +152,7 @@ struct _GstAdaptiveDemuxStream
   GCond fragment_download_cond;
   gboolean download_finished;   /* protected by fragment_download_lock */
   gboolean cancelled;           /* protected by fragment_download_lock */
+  gboolean src_at_ready;     /* protected by fragment_download_lock */
   gboolean starting_fragment;
   gboolean first_fragment_buffer;
   gint64 download_start_time;
@@ -183,6 +184,8 @@ struct _GstAdaptiveDemux
   /*< private >*/
   GstBin     bin;
 
+  gboolean running;
+
   gsize stream_struct_size;
 
   /*< protected >*/