From e0d5e022a1330fd912b92e34afe5b27666baba80 Mon Sep 17 00:00:00 2001 From: Vivia Nikolaidou Date: Tue, 1 Feb 2022 14:51:27 +0200 Subject: [PATCH] tsmux: Lock mux->tsmux, the programs hash table, and pad streams They contain implementations that are not thread-safe (e.g. GList, GHashTable). Part-of: --- .../gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.c | 76 ++++++++++++++++++++-- .../gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.h | 3 + 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.c b/subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.c index 1f88072..9adf9a2 100644 --- a/subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.c +++ b/subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.c @@ -129,6 +129,7 @@ gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg) /* Send initial segments again after a flush-stop, and also resend the * header sections */ + g_mutex_lock (&mux->lock); mux->first = TRUE; /* output PAT, SI tables */ @@ -141,6 +142,7 @@ gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg) tsmux_resend_pmt (program); } + g_mutex_unlock (&mux->lock); return GST_FLOW_OK; } @@ -297,6 +299,7 @@ steal_si_section (GstMpegtsSectionType * type, TsMuxSection * section, return TRUE; } +/* Must be called with mux->lock held */ static void gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc) { @@ -372,6 +375,7 @@ release_buffer_cb (guint8 * data, void *user_data) stream_data_free ((StreamData *) user_data); } +/* Must be called with mux->lock held */ static GstFlowReturn gst_base_ts_mux_create_or_update_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad, GstCaps * caps) @@ -746,6 +750,7 @@ is_valid_pmt_pid (guint16 pmt_pid) return TRUE; } +/* Must be called with mux->lock held */ static GstFlowReturn gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad) { @@ -767,6 +772,7 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad) return ret; } +/* Must be called with mux->lock held */ static GstFlowReturn gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad) { @@ -876,6 +882,7 @@ no_stream: } } +/* Must be called with mux->lock held */ static gboolean gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad, gpointer user_data) @@ -887,6 +894,7 @@ gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad, return *ret == GST_FLOW_OK; } +/* Must be called with mux->lock held */ static GstFlowReturn gst_base_ts_mux_create_streams (GstBaseTsMux * mux) { @@ -1189,11 +1197,13 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux, return GST_FLOW_OK; } + g_mutex_lock (&mux->lock); if (G_UNLIKELY (mux->first)) { ret = gst_base_ts_mux_create_streams (mux); if (G_UNLIKELY (ret != GST_FLOW_OK)) { if (buf) gst_buffer_unref (buf); + g_mutex_unlock (&mux->lock); return ret; } @@ -1232,6 +1242,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux, if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) { GstEvent *event; + g_mutex_unlock (&mux->lock); event = check_pending_key_unit_event (mux->force_key_unit_event, &agg_pad->segment, GST_BUFFER_PTS (buf), GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts); @@ -1251,6 +1262,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux, GST_TIME_ARGS (running_time), count); gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event); + g_mutex_lock (&mux->lock); /* output PAT, SI tables */ tsmux_resend_pat (mux->tsmux); tsmux_resend_si (mux->tsmux); @@ -1261,6 +1273,8 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux, tsmux_resend_pmt (program); } + } else { + g_mutex_lock (&mux->lock); } } @@ -1315,6 +1329,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux, GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported"); gst_buffer_unref (buf); + g_mutex_unlock (&mux->lock); return GST_FLOW_OK; } @@ -1344,6 +1359,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux, goto write_fail; } } + g_mutex_unlock (&mux->lock); /* flush packet cache */ return gst_base_ts_mux_push_packets (mux, FALSE); @@ -1385,9 +1401,12 @@ gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ, GstPad *pad = NULL; gchar *free_name = NULL; + g_mutex_lock (&mux->lock); if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) { - if (tsmux_find_stream (mux->tsmux, pid)) + if (tsmux_find_stream (mux->tsmux, pid)) { + g_mutex_unlock (&mux->lock); goto stream_exists; + } /* Make sure we don't use reserved PID. * FIXME : This should be extended to other variants (ex: ATSC) reserved PID */ if (pid < TSMUX_START_ES_PID) @@ -1400,6 +1419,7 @@ gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ, /* Name the pad correctly after the selected pid */ name = free_name = g_strdup_printf ("sink_%d", pid); } + g_mutex_unlock (&mux->lock); pad = (GstPad *) GST_ELEMENT_CLASS (parent_class)->request_new_pad (element, @@ -1433,6 +1453,7 @@ gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad) { GstBaseTsMux *mux = GST_BASE_TS_MUX (element); + g_mutex_lock (&mux->lock); if (mux->tsmux) { GList *cur; GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad); @@ -1457,6 +1478,7 @@ gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad) tsmux_resend_pmt (program); } } + g_mutex_unlock (&mux->lock); GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad); } @@ -1875,8 +1897,10 @@ gst_base_ts_mux_send_event (GstElement * element, GstEvent * event) if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) { handle_scte35_section (mux, event, section, 0, NULL); } else { + g_mutex_lock (&mux->lock); /* TODO: Check that the section type is supported */ tsmux_add_mpegts_si_section (mux->tsmux, section); + g_mutex_unlock (&mux->lock); } gst_event_unref (event); @@ -1906,18 +1930,25 @@ gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad, GstFlowReturn ret; GList *cur; - if (ts_pad->stream == NULL) + g_mutex_lock (&mux->lock); + if (ts_pad->stream == NULL) { + g_mutex_unlock (&mux->lock); break; + } forward = FALSE; gst_event_parse_caps (event, &caps); - if (!caps || !gst_caps_is_fixed (caps)) + if (!caps || !gst_caps_is_fixed (caps)) { + g_mutex_unlock (&mux->lock); break; + } ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps); - if (ret != GST_FLOW_OK) + if (ret != GST_FLOW_OK) { + g_mutex_unlock (&mux->lock); break; + } mux->tsmux->pat_changed = TRUE; mux->tsmux->si_changed = TRUE; @@ -1931,6 +1962,7 @@ gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad, program->pmt_changed = TRUE; tsmux_resend_pmt (program); } + g_mutex_unlock (&mux->lock); res = TRUE; break; @@ -2336,7 +2368,11 @@ done: static gboolean gst_base_ts_mux_start (GstAggregator * agg) { - gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE); + GstBaseTsMux *mux = GST_BASE_TS_MUX (agg); + + g_mutex_lock (&mux->lock); + gst_base_ts_mux_reset (mux, TRUE); + g_mutex_unlock (&mux->lock); return TRUE; } @@ -2344,7 +2380,11 @@ gst_base_ts_mux_start (GstAggregator * agg) static gboolean gst_base_ts_mux_stop (GstAggregator * agg) { + GstBaseTsMux *mux = GST_BASE_TS_MUX (agg); + + g_mutex_lock (&mux->lock); gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE); + g_mutex_unlock (&mux->lock); return TRUE; } @@ -2356,6 +2396,7 @@ gst_base_ts_mux_dispose (GObject * object) { GstBaseTsMux *mux = GST_BASE_TS_MUX (object); + g_mutex_lock (&mux->lock); gst_base_ts_mux_reset (mux, FALSE); if (mux->out_adapter) { @@ -2370,16 +2411,28 @@ gst_base_ts_mux_dispose (GObject * object) g_hash_table_destroy (mux->programs); mux->programs = NULL; } + g_mutex_unlock (&mux->lock); GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object)); } static void +gst_base_ts_mux_finalize (GObject * object) +{ + GstBaseTsMux *mux = GST_BASE_TS_MUX (object); + + g_mutex_clear (&mux->lock); + GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object)); +} + +static void gst_base_ts_mux_constructed (GObject * object) { GstBaseTsMux *mux = GST_BASE_TS_MUX (object); /* initial state */ + g_mutex_lock (&mux->lock); gst_base_ts_mux_reset (mux, TRUE); + g_mutex_unlock (&mux->lock); } static void @@ -2404,8 +2457,10 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id, } case PROP_PAT_INTERVAL: mux->pat_interval = g_value_get_uint (value); + g_mutex_lock (&mux->lock); if (mux->tsmux) tsmux_set_pat_interval (mux->tsmux, mux->pat_interval); + g_mutex_unlock (&mux->lock); break; case PROP_PMT_INTERVAL: mux->pmt_interval = g_value_get_uint (value); @@ -2413,7 +2468,9 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id, for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data); + g_mutex_lock (&mux->lock); tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval); + g_mutex_unlock (&mux->lock); } GST_OBJECT_UNLOCK (mux); break; @@ -2422,17 +2479,23 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id, break; case PROP_SI_INTERVAL: mux->si_interval = g_value_get_uint (value); + g_mutex_lock (&mux->lock); tsmux_set_si_interval (mux->tsmux, mux->si_interval); + g_mutex_unlock (&mux->lock); break; case PROP_BITRATE: mux->bitrate = g_value_get_uint64 (value); + g_mutex_lock (&mux->lock); if (mux->tsmux) tsmux_set_bitrate (mux->tsmux, mux->bitrate); + g_mutex_unlock (&mux->lock); break; case PROP_PCR_INTERVAL: mux->pcr_interval = g_value_get_uint (value); + g_mutex_lock (&mux->lock); if (mux->tsmux) tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval); + g_mutex_unlock (&mux->lock); break; case PROP_SCTE_35_PID: mux->scte35_pid = g_value_get_uint (value); @@ -2556,6 +2619,7 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass) gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_ts_mux_get_property); gobject_class->dispose = gst_base_ts_mux_dispose; + gobject_class->finalize = gst_base_ts_mux_finalize; gobject_class->constructed = gst_base_ts_mux_constructed; gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad; @@ -2656,4 +2720,6 @@ gst_base_ts_mux_init (GstBaseTsMux * mux) mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH; mux->automatic_alignment = 0; + + g_mutex_init (&mux->lock); } diff --git a/subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.h b/subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.h index 017b048..dbf105b 100644 --- a/subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.h +++ b/subprojects/gst-plugins-bad/gst/mpegtsmux/gstbasetsmux.h @@ -183,6 +183,9 @@ struct GstBaseTsMux { GstAdapter *out_adapter; GstBuffer *out_buffer; GstClockTimeDiff output_ts_offset; + + /* protects the tsmux object, the programs hash table, and pad streams */ + GMutex lock; }; /** -- 2.7.4