decodebin3: Protect fields related to streams handling with the SELECTION_LOCK
authorThibault Saunier <thibault.saunier@osg.samsung.com>
Thu, 15 Jun 2017 16:48:42 +0000 (12:48 -0400)
committerEdward Hervey <bilboed@bilboed.com>
Tue, 18 Jul 2017 11:05:06 +0000 (13:05 +0200)
Fields related to stream handling (input_streams,
output_streams, slots, guint slot_id) where used totally unprotected
until know.

This lead to several races, especially playing back RTSP streams.

To protect those fields, the OBJECT_LOCK can not be used as we sometimes
need to be able to post message on the bus while holding it.

decodebin3 already has a lock to manage stream selection, and in the end
it makes sense to protect all the stream management fields with the same
lock which is why we reuse the SELECTION_LOCK here.

https://bugzilla.gnome.org/show_bug.cgi?id=784012

gst/playback/gstdecodebin3-parse.c
gst/playback/gstdecodebin3.c

index 8c51d91..8df9bc1 100644 (file)
@@ -63,6 +63,7 @@ static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad,
 static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad,
     DecodebinInput * input);
 
+/* WITH SELECTION_LOCK TAKEN! */
 static gboolean
 pending_pads_are_eos (DecodebinInput * input)
 {
@@ -77,6 +78,7 @@ pending_pads_are_eos (DecodebinInput * input)
   return TRUE;
 }
 
+/* WITH SELECTION_LOCK TAKEN! */
 static gboolean
 all_inputs_are_eos (GstDecodebin3 * dbin)
 {
@@ -99,6 +101,7 @@ all_inputs_are_eos (GstDecodebin3 * dbin)
   return TRUE;
 }
 
+/* WITH SELECTION_LOCK TAKEN! */
 static void
 check_all_streams_for_eos (GstDecodebin3 * dbin)
 {
@@ -253,7 +256,9 @@ parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info,
         input->saw_eos = TRUE;
         if (all_inputs_are_eos (input->dbin)) {
           GST_DEBUG_OBJECT (pad, "real input pad, marking as EOS");
+          SELECTION_LOCK (input->dbin);
           check_all_streams_for_eos (input->dbin);
+          SELECTION_UNLOCK (input->dbin);
         } else {
           GstPad *peer = gst_pad_get_peer (input->srcpad);
           if (peer) {
@@ -337,12 +342,15 @@ create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad,
       (GstPadProbeCallback) parse_chain_output_probe, res, NULL);
 
   /* Add to list of current input streams */
+  SELECTION_LOCK (dbin);
   dbin->input_streams = g_list_append (dbin->input_streams, res);
+  SELECTION_UNLOCK (dbin);
   GST_DEBUG_OBJECT (pad, "Done creating input stream");
 
   return res;
 }
 
+/* WITH SELECTION_LOCK TAKEN! */
 static void
 remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
 {
@@ -362,9 +370,7 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
     }
   }
 
-  g_mutex_lock (&dbin->selection_lock);
   slot = get_slot_for_input (dbin, stream);
-  g_mutex_unlock (&dbin->selection_lock);
   if (slot) {
     slot->pending_stream = NULL;
     slot->input = NULL;
@@ -390,8 +396,6 @@ parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
   GstDecodebin3 *dbin = input->dbin;
   GList *tmp, *unused_slot = NULL;
 
-  GST_FIXME_OBJECT (dbin, "Need a lock !");
-
   GST_DEBUG_OBJECT (pad, "Got a buffer ! UNBLOCK !");
 
   /* Any data out the demuxer means it's not creating pads
@@ -402,6 +406,7 @@ parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
 
   /* 2. Remove unused streams (push EOS) */
   GST_DEBUG_OBJECT (dbin, "Removing unused streams");
+  SELECTION_LOCK (dbin);
   tmp = dbin->input_streams;
   while (tmp != NULL) {
     DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
@@ -423,6 +428,7 @@ parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
     } else
       tmp = next;
   }
+  SELECTION_UNLOCK (dbin);
 
   GST_DEBUG_OBJECT (dbin, "Creating new streams (if needed)");
   /* 3. Create new streams */
@@ -575,11 +581,8 @@ parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp)
 
       SELECTION_LOCK (dbin);
       slot = get_slot_for_input (dbin, input);
-      SELECTION_UNLOCK (dbin);
 
       remove_input_stream (dbin, input);
-
-      SELECTION_LOCK (dbin);
       if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
         /* if slot is still there and already drained, remove it in here */
         if (slot->output) {
index 4b1d9c4..e965ed6 100644 (file)
@@ -207,16 +207,15 @@ struct _GstDecodebin3
 
   GstElement *multiqueue;
 
-  /* FIXME : Mutex for protecting values below */
-  GstStreamCollection *collection;      /* Active collection */
-
+  /* selection_lock protects access to following variables */
+  GMutex selection_lock;
   GList *input_streams;         /* List of DecodebinInputStream for active collection */
   GList *output_streams;        /* List of DecodebinOutputStream used for output */
   GList *slots;                 /* List of MultiQueueSlot */
   guint slot_id;
 
-  /* selection_lock protects access to following variables */
-  GMutex selection_lock;
+  /* Active collection */
+  GstStreamCollection *collection;
   /* requested selection of stream-id to activate post-multiqueue */
   GList *requested_selection;
   /* list of stream-id currently activated in output */
@@ -236,7 +235,6 @@ struct _GstDecodebin3
    * FIXME : Is this really needed ? */
   GList *pending_collection;
 
-
   /* Factories */
   GMutex factories_lock;
   guint32 factories_cookie;
@@ -1264,6 +1262,7 @@ handle_stream_collection (GstDecodebin3 * dbin,
 #endif
 
   /* Store collection for later usage */
+  SELECTION_LOCK (dbin);
   if (dbin->collection == NULL) {
     dbin->collection = collection;
   } else {
@@ -1279,6 +1278,7 @@ handle_stream_collection (GstDecodebin3 * dbin,
     /* dbin->pending_collection = */
     /*     g_list_append (dbin->pending_collection, collection); */
   }
+  SELECTION_UNLOCK (dbin);
 }
 
 static void
@@ -1301,6 +1301,7 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
         posting_collection = TRUE;
         INPUT_UNLOCK (dbin);
       }
+
       SELECTION_LOCK (dbin);
       if (dbin->collection && collection != dbin->collection) {
         /* Replace collection message, we most likely aggregated it */
@@ -1312,6 +1313,7 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
         message = new_msg;
       }
       SELECTION_UNLOCK (dbin);
+
       if (collection)
         gst_object_unref (collection);
       break;
@@ -1651,8 +1653,9 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
           }
           slot->probe_id = 0;
           dbin->slots = g_list_remove (dbin->slots, slot);
-          free_multiqueue_slot_async (dbin, slot);
           SELECTION_UNLOCK (dbin);
+
+          free_multiqueue_slot_async (dbin, slot);
           ret = GST_PAD_PROBE_REMOVE;
         }
         break;
@@ -1726,11 +1729,14 @@ create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
       gst_stream_type_get_name (type));
   slot = g_new0 (MultiQueueSlot, 1);
   slot->dbin = dbin;
+
   slot->id = dbin->slot_id++;
+
   slot->type = type;
   slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
   if (slot->sink_pad == NULL)
     goto fail;
+
   it = gst_pad_iterate_internal_links (slot->sink_pad);
   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
@@ -1751,7 +1757,9 @@ create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
 
   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
       GST_DEBUG_PAD_NAME (slot->src_pad));
+
   dbin->slots = g_list_append (dbin->slots, slot);
+
   return slot;
 
   /* ERRORS */