tsmux: Lock mux->tsmux, the programs hash table, and pad streams
authorVivia Nikolaidou <vivia@ahiru.eu>
Tue, 1 Feb 2022 12:51:27 +0000 (14:51 +0200)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 25 Feb 2022 17:42:52 +0000 (17:42 +0000)
They contain implementations that are not thread-safe (e.g. GList, GHashTable).

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1616>

subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.c
subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.h

index 1f88072..9adf9a2 100644 (file)
@@ -129,6 +129,7 @@ gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
 
   /* Send initial segments again after a flush-stop, and also resend the
    * header sections */
+  g_mutex_lock (&mux->lock);
   mux->first = TRUE;
 
   /* output PAT, SI tables */
@@ -141,6 +142,7 @@ gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
 
     tsmux_resend_pmt (program);
   }
+  g_mutex_unlock (&mux->lock);
 
   return GST_FLOW_OK;
 }
@@ -297,6 +299,7 @@ steal_si_section (GstMpegtsSectionType * type, TsMuxSection * section,
   return TRUE;
 }
 
+/* Must be called with mux->lock held */
 static void
 gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
 {
@@ -372,6 +375,7 @@ release_buffer_cb (guint8 * data, void *user_data)
   stream_data_free ((StreamData *) user_data);
 }
 
+/* Must be called with mux->lock held */
 static GstFlowReturn
 gst_base_ts_mux_create_or_update_stream (GstBaseTsMux * mux,
     GstBaseTsMuxPad * ts_pad, GstCaps * caps)
@@ -746,6 +750,7 @@ is_valid_pmt_pid (guint16 pmt_pid)
   return TRUE;
 }
 
+/* Must be called with mux->lock held */
 static GstFlowReturn
 gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
 {
@@ -767,6 +772,7 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
   return ret;
 }
 
+/* Must be called with mux->lock held */
 static GstFlowReturn
 gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
 {
@@ -876,6 +882,7 @@ no_stream:
   }
 }
 
+/* Must be called with mux->lock held */
 static gboolean
 gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
     gpointer user_data)
@@ -887,6 +894,7 @@ gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
   return *ret == GST_FLOW_OK;
 }
 
+/* Must be called with mux->lock held */
 static GstFlowReturn
 gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
 {
@@ -1189,11 +1197,13 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
     return GST_FLOW_OK;
   }
 
+  g_mutex_lock (&mux->lock);
   if (G_UNLIKELY (mux->first)) {
     ret = gst_base_ts_mux_create_streams (mux);
     if (G_UNLIKELY (ret != GST_FLOW_OK)) {
       if (buf)
         gst_buffer_unref (buf);
+      g_mutex_unlock (&mux->lock);
       return ret;
     }
 
@@ -1232,6 +1242,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
   if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
     GstEvent *event;
 
+    g_mutex_unlock (&mux->lock);
     event = check_pending_key_unit_event (mux->force_key_unit_event,
         &agg_pad->segment, GST_BUFFER_PTS (buf),
         GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
@@ -1251,6 +1262,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
           GST_TIME_ARGS (running_time), count);
       gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event);
 
+      g_mutex_lock (&mux->lock);
       /* output PAT, SI tables */
       tsmux_resend_pat (mux->tsmux);
       tsmux_resend_si (mux->tsmux);
@@ -1261,6 +1273,8 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
 
         tsmux_resend_pmt (program);
       }
+    } else {
+      g_mutex_lock (&mux->lock);
     }
   }
 
@@ -1315,6 +1329,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
     GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
 
     gst_buffer_unref (buf);
+    g_mutex_unlock (&mux->lock);
     return GST_FLOW_OK;
   }
 
@@ -1344,6 +1359,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
       goto write_fail;
     }
   }
+  g_mutex_unlock (&mux->lock);
   /* flush packet cache */
   return gst_base_ts_mux_push_packets (mux, FALSE);
 
@@ -1385,9 +1401,12 @@ gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
   GstPad *pad = NULL;
   gchar *free_name = NULL;
 
+  g_mutex_lock (&mux->lock);
   if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
-    if (tsmux_find_stream (mux->tsmux, pid))
+    if (tsmux_find_stream (mux->tsmux, pid)) {
+      g_mutex_unlock (&mux->lock);
       goto stream_exists;
+    }
     /* Make sure we don't use reserved PID.
      * FIXME : This should be extended to other variants (ex: ATSC) reserved PID */
     if (pid < TSMUX_START_ES_PID)
@@ -1400,6 +1419,7 @@ gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
     /* Name the pad correctly after the selected pid */
     name = free_name = g_strdup_printf ("sink_%d", pid);
   }
+  g_mutex_unlock (&mux->lock);
 
   pad = (GstPad *)
       GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
@@ -1433,6 +1453,7 @@ gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
 {
   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
 
+  g_mutex_lock (&mux->lock);
   if (mux->tsmux) {
     GList *cur;
     GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
@@ -1457,6 +1478,7 @@ gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
       tsmux_resend_pmt (program);
     }
   }
+  g_mutex_unlock (&mux->lock);
 
   GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad);
 }
@@ -1875,8 +1897,10 @@ gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
     if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
       handle_scte35_section (mux, event, section, 0, NULL);
     } else {
+      g_mutex_lock (&mux->lock);
       /* TODO: Check that the section type is supported */
       tsmux_add_mpegts_si_section (mux->tsmux, section);
+      g_mutex_unlock (&mux->lock);
     }
 
     gst_event_unref (event);
@@ -1906,18 +1930,25 @@ gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
       GstFlowReturn ret;
       GList *cur;
 
-      if (ts_pad->stream == NULL)
+      g_mutex_lock (&mux->lock);
+      if (ts_pad->stream == NULL) {
+        g_mutex_unlock (&mux->lock);
         break;
+      }
 
       forward = FALSE;
 
       gst_event_parse_caps (event, &caps);
-      if (!caps || !gst_caps_is_fixed (caps))
+      if (!caps || !gst_caps_is_fixed (caps)) {
+        g_mutex_unlock (&mux->lock);
         break;
+      }
 
       ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
-      if (ret != GST_FLOW_OK)
+      if (ret != GST_FLOW_OK) {
+        g_mutex_unlock (&mux->lock);
         break;
+      }
 
       mux->tsmux->pat_changed = TRUE;
       mux->tsmux->si_changed = TRUE;
@@ -1931,6 +1962,7 @@ gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
         program->pmt_changed = TRUE;
         tsmux_resend_pmt (program);
       }
+      g_mutex_unlock (&mux->lock);
 
       res = TRUE;
       break;
@@ -2336,7 +2368,11 @@ done:
 static gboolean
 gst_base_ts_mux_start (GstAggregator * agg)
 {
-  gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
+  GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+
+  g_mutex_lock (&mux->lock);
+  gst_base_ts_mux_reset (mux, TRUE);
+  g_mutex_unlock (&mux->lock);
 
   return TRUE;
 }
@@ -2344,7 +2380,11 @@ gst_base_ts_mux_start (GstAggregator * agg)
 static gboolean
 gst_base_ts_mux_stop (GstAggregator * agg)
 {
+  GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+
+  g_mutex_lock (&mux->lock);
   gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
+  g_mutex_unlock (&mux->lock);
 
   return TRUE;
 }
@@ -2356,6 +2396,7 @@ gst_base_ts_mux_dispose (GObject * object)
 {
   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
 
+  g_mutex_lock (&mux->lock);
   gst_base_ts_mux_reset (mux, FALSE);
 
   if (mux->out_adapter) {
@@ -2370,16 +2411,28 @@ gst_base_ts_mux_dispose (GObject * object)
     g_hash_table_destroy (mux->programs);
     mux->programs = NULL;
   }
+  g_mutex_unlock (&mux->lock);
   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
 }
 
 static void
+gst_base_ts_mux_finalize (GObject * object)
+{
+  GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
+
+  g_mutex_clear (&mux->lock);
+  GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
+}
+
+static void
 gst_base_ts_mux_constructed (GObject * object)
 {
   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
 
   /* initial state */
+  g_mutex_lock (&mux->lock);
   gst_base_ts_mux_reset (mux, TRUE);
+  g_mutex_unlock (&mux->lock);
 }
 
 static void
@@ -2404,8 +2457,10 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
     }
     case PROP_PAT_INTERVAL:
       mux->pat_interval = g_value_get_uint (value);
+      g_mutex_lock (&mux->lock);
       if (mux->tsmux)
         tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
+      g_mutex_unlock (&mux->lock);
       break;
     case PROP_PMT_INTERVAL:
       mux->pmt_interval = g_value_get_uint (value);
@@ -2413,7 +2468,9 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
       for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
         GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data);
 
+        g_mutex_lock (&mux->lock);
         tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
+        g_mutex_unlock (&mux->lock);
       }
       GST_OBJECT_UNLOCK (mux);
       break;
@@ -2422,17 +2479,23 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
       break;
     case PROP_SI_INTERVAL:
       mux->si_interval = g_value_get_uint (value);
+      g_mutex_lock (&mux->lock);
       tsmux_set_si_interval (mux->tsmux, mux->si_interval);
+      g_mutex_unlock (&mux->lock);
       break;
     case PROP_BITRATE:
       mux->bitrate = g_value_get_uint64 (value);
+      g_mutex_lock (&mux->lock);
       if (mux->tsmux)
         tsmux_set_bitrate (mux->tsmux, mux->bitrate);
+      g_mutex_unlock (&mux->lock);
       break;
     case PROP_PCR_INTERVAL:
       mux->pcr_interval = g_value_get_uint (value);
+      g_mutex_lock (&mux->lock);
       if (mux->tsmux)
         tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval);
+      g_mutex_unlock (&mux->lock);
       break;
     case PROP_SCTE_35_PID:
       mux->scte35_pid = g_value_get_uint (value);
@@ -2556,6 +2619,7 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
   gobject_class->get_property =
       GST_DEBUG_FUNCPTR (gst_base_ts_mux_get_property);
   gobject_class->dispose = gst_base_ts_mux_dispose;
+  gobject_class->finalize = gst_base_ts_mux_finalize;
   gobject_class->constructed = gst_base_ts_mux_constructed;
 
   gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
@@ -2656,4 +2720,6 @@ gst_base_ts_mux_init (GstBaseTsMux * mux)
 
   mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH;
   mux->automatic_alignment = 0;
+
+  g_mutex_init (&mux->lock);
 }
index 017b048..dbf105b 100644 (file)
@@ -183,6 +183,9 @@ struct GstBaseTsMux {
   GstAdapter *out_adapter;
   GstBuffer *out_buffer;
   GstClockTimeDiff output_ts_offset;
+
+  /* protects the tsmux object, the programs hash table, and pad streams */
+  GMutex lock;
 };
 
 /**