adaptivedemux: remove some deadlocks using webkitwebsrc.
authorCharlie Turner <cturner@igalia.com>
Tue, 2 Jul 2019 11:27:40 +0000 (12:27 +0100)
committerCharlie Turner <cturner@igalia.com>
Mon, 29 Jul 2019 12:19:41 +0000 (13:19 +0100)
WebKit's websrc depends on the main-thread for download completion
rendezvous. This exposed a number of deadlocks in adaptivedemux due to
it holding the MANIFEST_LOCK during network requests, and also needing
to hold it to change_state and resolve queries, which frequently occur
during these download windows.

Make demux->running MT-safe so that it can be accessed without using the
MANIFEST_LOCK. In case a source is downloading and requires a MT-thread
notification for completion of the fragment download, a state change
during this download window will deadlock unless we cancel the downloads
and ensure they are not restarted before we finish the state-change.

Also make demux->priv->have_manifest MT-safe. A duration query happening
in the window described above can deadlock for the same reason. Other
src queries (like SEEKING) that happen in this window also could
deadlock, but I haven't hit this scenario.

Increase granularity of API_LOCK'ing in change_state as well. We need to
cancel downloads before trying to take this lock, since sink events
(EOS) will hold it before starting a fragment download.

ext/hls/gsthlsdemux.c
gst-libs/gst/adaptivedemux/gstadaptivedemux.c
gst-libs/gst/adaptivedemux/gstadaptivedemux.h

index d4ca55634b6a57a5dc6e9d2ba710950e196ed1f7..1e4d0df90018c180a11b557c5d31aa5c0013cc4e 100644 (file)
@@ -73,6 +73,7 @@ static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux,
     gboolean update, GError ** err);
 static gchar *gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf);
 
+/* FIXME: the return value is never used? */
 static gboolean gst_hls_demux_change_playlist (GstHLSDemux * demux,
     guint max_bitrate, gboolean * changed);
 static GstBuffer *gst_hls_demux_decrypt_fragment (GstHLSDemux * demux,
@@ -1173,6 +1174,8 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux)
 {
   GstHLSDemux *demux = GST_HLS_DEMUX_CAST (ademux);
 
+  GST_DEBUG_OBJECT (demux, "resetting");
+
   GST_M3U8_CLIENT_LOCK (hlsdemux->client);
   if (demux->master) {
     gst_hls_master_playlist_unref (demux->master);
@@ -1394,7 +1397,8 @@ retry:
   if (download == NULL) {
     gchar *base_uri;
 
-    if (!update || main_checked || demux->master->is_simple) {
+    if (!update || main_checked || demux->master->is_simple
+        || !gst_adaptive_demux_is_running (GST_ADAPTIVE_DEMUX_CAST (demux))) {
       g_free (uri);
       return FALSE;
     }
@@ -1627,7 +1631,7 @@ retry_failover_protection:
     if (changed)
       *changed = TRUE;
     stream->discont = TRUE;
-  } else {
+  } else if (gst_adaptive_demux_is_running (GST_ADAPTIVE_DEMUX_CAST (demux))) {
     GstHLSVariantStream *failover_variant = NULL;
     GList *failover;
 
index 5244e005da082c0dd93d032bfa3c6ff75a62e98e..db2d3f36557163cde1f51605bc1ef7ab903c5b06 100644 (file)
@@ -166,7 +166,7 @@ enum
 struct _GstAdaptiveDemuxPrivate
 {
   GstAdapter *input_adapter;    /* protected by manifest_lock */
-  gboolean have_manifest;       /* protected by manifest_lock */
+  gint have_manifest;           /* MT safe */
 
   GList *old_streams;           /* protected by manifest_lock */
 
@@ -564,25 +564,31 @@ gst_adaptive_demux_change_state (GstElement * element,
   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
 
-  GST_API_LOCK (demux);
-
   switch (transition) {
     case GST_STATE_CHANGE_PAUSED_TO_READY:
+      if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
+        GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
+      gst_uri_downloader_cancel (demux->downloader);
+
+      GST_API_LOCK (demux);
       GST_MANIFEST_LOCK (demux);
-      demux->running = FALSE;
       gst_adaptive_demux_reset (demux);
       GST_MANIFEST_UNLOCK (demux);
+      GST_API_UNLOCK (demux);
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
+      GST_API_LOCK (demux);
       GST_MANIFEST_LOCK (demux);
       gst_adaptive_demux_reset (demux);
       /* Clear "cancelled" flag in uridownloader since subclass might want to
        * use uridownloader to fetch another manifest */
       gst_uri_downloader_reset (demux->downloader);
-      if (demux->priv->have_manifest)
+      if (g_atomic_int_get (&demux->priv->have_manifest))
         gst_adaptive_demux_start_manifest_update_task (demux);
-      demux->running = TRUE;
       GST_MANIFEST_UNLOCK (demux);
+      GST_API_UNLOCK (demux);
+      if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
+        GST_DEBUG_OBJECT (demux, "demuxer has started running");
       break;
     default:
       break;
@@ -595,7 +601,6 @@ gst_adaptive_demux_change_state (GstElement * element,
    */
   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
 
-  GST_API_UNLOCK (demux);
   return result;
 }
 
@@ -686,7 +691,7 @@ gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
             (NULL));
         ret = FALSE;
       } else {
-        demux->priv->have_manifest = TRUE;
+        g_atomic_int_set (&demux->priv->have_manifest, TRUE);
       }
       gst_buffer_unref (manifest_buffer);
 
@@ -829,7 +834,7 @@ gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
   demux->manifest_base_uri = NULL;
 
   gst_adapter_clear (demux->priv->input_adapter);
-  demux->priv->have_manifest = FALSE;
+  g_atomic_int_set (&demux->priv->have_manifest, FALSE);
 
   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
 
@@ -1051,7 +1056,7 @@ gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
   demux->prepared_streams = demux->next_streams;
   demux->next_streams = NULL;
 
-  if (!demux->running) {
+  if (!gst_adaptive_demux_is_running (demux)) {
     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
     return TRUE;
   }
@@ -1843,7 +1848,7 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
 
       if (stream) {
-        if (!stream->cancelled && demux->running &&
+        if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
             stream->last_ret == GST_FLOW_NOT_LINKED) {
           stream->last_ret = GST_FLOW_OK;
           stream->restart_download = TRUE;
@@ -1916,9 +1921,8 @@ gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
 
       gst_query_parse_duration (query, &fmt, NULL);
 
-      GST_MANIFEST_LOCK (demux);
-
-      if (fmt == GST_FORMAT_TIME && demux->priv->have_manifest) {
+      if (fmt == GST_FORMAT_TIME
+          && g_atomic_int_get (&demux->priv->have_manifest)) {
         duration = demux_class->get_duration (demux);
 
         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
@@ -1927,8 +1931,6 @@ gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
         }
       }
 
-      GST_MANIFEST_UNLOCK (demux);
-
       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
       break;
@@ -1943,15 +1945,14 @@ gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
       gint64 stop = -1;
       gint64 start = 0;
 
-      GST_MANIFEST_LOCK (demux);
-
-      if (!demux->priv->have_manifest) {
-        GST_MANIFEST_UNLOCK (demux);
+      if (!g_atomic_int_get (&demux->priv->have_manifest)) {
         GST_INFO_OBJECT (demux,
             "Don't have manifest yet, can't answer seeking query");
         return FALSE;           /* can't answer without manifest */
       }
 
+      GST_MANIFEST_LOCK (demux);
+
       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
       if (fmt == GST_FORMAT_TIME) {
@@ -2013,7 +2014,7 @@ gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
 {
   GList *iter;
 
-  if (!demux->running) {
+  if (!gst_adaptive_demux_is_running (demux)) {
     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
     return;
   }
@@ -4487,6 +4488,20 @@ gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
   return g_date_time_new_from_timeval_utc (&gtv);
 }
 
+/**
+ * gst_adaptive_demux_is_running
+ * @demux: #GstAdaptiveDemux
+ * Returns: whether the demuxer is processing data
+ *
+ * Returns FALSE if shutdown has started (transitioning down from
+ * PAUSED), otherwise TRUE.
+ */
+gboolean
+gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
+{
+  return g_atomic_int_get (&demux->running);
+}
+
 static GstAdaptiveDemuxTimer *
 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
 {
index c1a7f6fdfa0e493a62fe558d0820b94022528b2a..9de70dafecf3cb8f190627d40eb1d437827f47f0 100644 (file)
@@ -207,7 +207,7 @@ struct _GstAdaptiveDemux
   /*< private >*/
   GstBin     bin;
 
-  gboolean running;
+  gint running;
 
   gsize stream_struct_size;
 
@@ -529,6 +529,9 @@ GstClockTime gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux);
 GST_ADAPTIVE_DEMUX_API
 GDateTime *gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux);
 
+GST_ADAPTIVE_DEMUX_API
+gboolean gst_adaptive_demux_is_running (GstAdaptiveDemux * demux);
+
 G_END_DECLS
 
 #endif