mpegtsmux: Make handling of sinkpads thread-safe
authorJan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>
Wed, 2 Sep 2020 13:29:49 +0000 (15:29 +0200)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 9 Sep 2020 02:25:40 +0000 (02:25 +0000)
Ensure we take the object lock while accessing `GstElement.sinkpads`.
Use an iterator when the code isn't simple to avoid deadlock.

When we find the best pad, take a reference so a concurrent pad
release doesn't destroy the pad before we're done with it.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1553>

gst/mpegtsmux/gstbasetsmux.c
gst/mpegtsmux/gstbasetsmux.h

index 045c34d..f4fc338 100644 (file)
@@ -344,10 +344,14 @@ gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
   gst_event_replace (&mux->force_key_unit_event, NULL);
   gst_buffer_replace (&mux->out_buffer, NULL);
 
+  GST_OBJECT_LOCK (mux);
+
   for (l = GST_ELEMENT (mux)->sinkpads; l; l = l->next) {
     gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (l->data));
   }
 
+  GST_OBJECT_UNLOCK (mux);
+
   if (alloc) {
     g_assert (klass->create_ts_mux);
 
@@ -810,23 +814,26 @@ no_stream:
   }
 }
 
+static gboolean
+gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
+    gpointer user_data)
+{
+  GstFlowReturn *ret = user_data;
+
+  *ret = gst_base_ts_mux_create_pad_stream (GST_BASE_TS_MUX (element), pad);
+
+  return *ret == GST_FLOW_OK;
+}
+
 static GstFlowReturn
 gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
 {
   GstFlowReturn ret = GST_FLOW_OK;
-  GList *walk = GST_ELEMENT (mux)->sinkpads;
-
-  /* Create the streams */
-  while (walk) {
-    GstPad *pad = GST_PAD (walk->data);
 
-    ret = gst_base_ts_mux_create_pad_stream (mux, pad);
-    if (ret != GST_FLOW_OK)
-      return ret;
-    walk = g_list_next (walk);
-  }
+  gst_element_foreach_sink_pad (GST_ELEMENT_CAST (mux),
+      gst_base_ts_mux_create_pad_stream_func, &ret);
 
-  return GST_FLOW_OK;
+  return ret;
 }
 
 static void
@@ -1478,11 +1485,9 @@ gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
     case GST_EVENT_CUSTOM_UPSTREAM:
     {
       GstIterator *iter;
-      GstIteratorResult iter_ret;
-      GstPad *sinkpad;
       GValue sinkpad_value = G_VALUE_INIT;
       GstClockTime running_time;
-      gboolean all_headers, done, res = FALSE;
+      gboolean all_headers, done = FALSE, res = FALSE;
       guint count;
 
       if (!gst_video_event_is_force_key_unit (event))
@@ -1505,28 +1510,28 @@ gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
       gst_event_replace (&mux->force_key_unit_event, event);
 
       iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
-      done = FALSE;
-      while (!done) {
-        gboolean tmp;
 
-        iter_ret = gst_iterator_next (iter, &sinkpad_value);
-        sinkpad = GST_PAD (g_value_get_object (&sinkpad_value));
+      while (!done) {
+        switch (gst_iterator_next (iter, &sinkpad_value)) {
+          case GST_ITERATOR_OK:{
+            GstPad *sinkpad = g_value_get_object (&sinkpad_value);
+            gboolean tmp;
 
-        switch (iter_ret) {
-          case GST_ITERATOR_DONE:
-            done = TRUE;
-            break;
-          case GST_ITERATOR_OK:
             GST_INFO_OBJECT (GST_AGGREGATOR_SRC_PAD (agg), "forwarding");
             tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
             GST_INFO_OBJECT (mux, "result %d", tmp);
             /* succeed if at least one pad succeeds */
             res |= tmp;
             break;
-          case GST_ITERATOR_ERROR:
+          }
+          case GST_ITERATOR_DONE:
             done = TRUE;
             break;
           case GST_ITERATOR_RESYNC:
+            gst_iterator_resync (iter);
+            break;
+          case GST_ITERATOR_ERROR:
+            g_assert_not_reached ();
             break;
         }
         g_value_reset (&sinkpad_value);
@@ -1636,28 +1641,38 @@ gst_base_ts_mux_update_src_caps (GstAggregator * agg, GstCaps * caps,
 static GstBaseTsMuxPad *
 gst_base_ts_mux_find_best_pad (GstAggregator * aggregator)
 {
-  GstBaseTsMuxPad *pad, *best = NULL;
-  GList *l;
-  GstBuffer *buffer;
+  GstBaseTsMuxPad *best = NULL;
   GstClockTime best_ts = GST_CLOCK_TIME_NONE;
+  GList *l;
+
+  GST_OBJECT_LOCK (aggregator);
 
   for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) {
-    pad = GST_BASE_TS_MUX_PAD (l->data);
-    buffer = gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (pad));
+    GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
+    GstAggregatorPad *apad = GST_AGGREGATOR_PAD_CAST (tpad);
+    GstBuffer *buffer;
+
+    buffer = gst_aggregator_pad_peek_buffer (apad);
     if (!buffer)
       continue;
     if (best_ts == GST_CLOCK_TIME_NONE) {
-      best = pad;
+      best = tpad;
       best_ts = GST_BUFFER_DTS_OR_PTS (buffer);
     } else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) {
       GstClockTime t = GST_BUFFER_DTS_OR_PTS (buffer);
       if (t < best_ts) {
-        best = pad;
+        best = tpad;
         best_ts = t;
       }
     }
     gst_buffer_unref (buffer);
   }
+
+  if (best)
+    gst_object_ref (best);
+
+  GST_OBJECT_UNLOCK (aggregator);
+
   GST_DEBUG_OBJECT (aggregator,
       "Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
       GST_TIME_ARGS (best_ts), best);
@@ -1669,14 +1684,22 @@ static gboolean
 gst_base_ts_mux_are_all_pads_eos (GstBaseTsMux * mux)
 {
   GList *l;
+  gboolean ret = TRUE;
+
+  GST_OBJECT_LOCK (mux);
 
   for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
     GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (l->data);
 
-    if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad)))
-      return FALSE;
+    if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad))) {
+      ret = FALSE;
+      break;
+    }
   }
-  return TRUE;
+
+  GST_OBJECT_UNLOCK (mux);
+
+  return ret;
 }
 
 
@@ -1696,6 +1719,8 @@ gst_base_ts_mux_aggregate (GstAggregator * agg, gboolean timeout)
         gst_base_ts_mux_aggregate_buffer (GST_BASE_TS_MUX (agg),
         GST_AGGREGATOR_PAD (best), buffer);
 
+    gst_object_unref (best);
+
     if (ret != GST_FLOW_OK)
       goto done;
   }
@@ -1768,7 +1793,7 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
 {
   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
-  GList *walk;
+  GList *l;
 
   switch (prop_id) {
     case PROP_PROG_MAP:
@@ -1789,15 +1814,14 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
         tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
       break;
     case PROP_PMT_INTERVAL:
-      walk = GST_ELEMENT (object)->sinkpads;
       mux->pmt_interval = g_value_get_uint (value);
-
-      while (walk) {
-        GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data);
+      GST_OBJECT_LOCK (mux);
+      for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
+        GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data);
 
         tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
-        walk = g_list_next (walk);
       }
+      GST_OBJECT_UNLOCK (mux);
       break;
     case PROP_ALIGNMENT:
       mux->alignment = g_value_get_int (value);
index 89bf7e0..b4970a9 100644 (file)
@@ -176,7 +176,7 @@ struct GstBaseTsMux {
   guint pcr_interval;
   guint scte35_pid;
   guint scte35_null_interval;
-  
+
   /* state */
   gboolean first;
   GstClockTime pending_key_unit_ts;