From f97aed4af53bccf44cd44ea6576e821b56a7756d Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Tue, 21 May 2019 15:25:03 -0400 Subject: [PATCH] rtpssrcdemux: Avoid taking streamlock out-of-band In this change we now protect the internal srcpads list using the stream lock and limit usage of the internal stream lock to preventing data flowing on the other src pad type while creating and signalling the new pad. This fixes a deadlock with RTPBin shutdown lock. These two locks would end up being taken in two different order, which caused a deadlock. More generally, we should not rely on a streamlock when handling out-of-band data, so as a side effect, we should not take a stream lock when iterating internal links. --- gst/rtpmanager/gstrtpssrcdemux.c | 96 ++++++++++++++++++++++--------------- tests/check/elements/rtpssrcdemux.c | 67 ++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 38 deletions(-) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 1661e79..880e8f2 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -145,6 +145,7 @@ struct _GstRtpSsrcDemuxPad }; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found + * MUST be called with object lock */ static GstRtpSsrcDemuxPad * find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) @@ -160,6 +161,38 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) return NULL; } +/* returns a reference to the pad if found, %NULL otherwise */ +static GstPad * +get_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype) +{ + GstRtpSsrcDemuxPad *demuxpad; + GstPad *retpad; + + GST_OBJECT_LOCK (demux); + + demuxpad = find_demux_pad_for_ssrc (demux, ssrc); + if (!demuxpad) { + GST_OBJECT_UNLOCK (demux); + return NULL; + } + + switch (padtype) { + case RTP_PAD: + retpad = gst_object_ref (demuxpad->rtp_pad); + break; + case RTCP_PAD: + retpad = gst_object_ref (demuxpad->rtcp_pad); + break; + default: + retpad = NULL; + g_assert_not_reached (); + } + + GST_OBJECT_UNLOCK (demux); + + return retpad; +} + static GstEvent * add_ssrc_and_ref (GstEvent * event, guint32 ssrc) { @@ -229,6 +262,7 @@ forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad, gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata); } +/* MUST only be called from streaming thread */ static GstPad * find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype) @@ -242,22 +276,9 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, INTERNAL_STREAM_LOCK (demux); - demuxpad = find_demux_pad_for_ssrc (demux, ssrc); - if (demuxpad != NULL) { - switch (padtype) { - case RTP_PAD: - retpad = gst_object_ref (demuxpad->rtp_pad); - break; - case RTCP_PAD: - retpad = gst_object_ref (demuxpad->rtcp_pad); - break; - default: - retpad = NULL; - g_assert_not_reached (); - } - + retpad = get_demux_pad_for_ssrc (demux, ssrc, padtype); + if (retpad != NULL) { INTERNAL_STREAM_UNLOCK (demux); - return retpad; } @@ -283,7 +304,9 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gst_pad_set_element_private (rtp_pad, demuxpad); gst_pad_set_element_private (rtcp_pad, demuxpad); + GST_OBJECT_LOCK (demux); demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); + GST_OBJECT_UNLOCK (demux); gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query); gst_pad_set_iterate_internal_links_function (rtp_pad, @@ -316,17 +339,11 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, g_assert_not_reached (); } - gst_object_ref (rtp_pad); - gst_object_ref (rtcp_pad); - g_signal_emit (G_OBJECT (demux), gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); INTERNAL_STREAM_UNLOCK (demux); - gst_object_unref (rtp_pad); - gst_object_unref (rtcp_pad); - return retpad; } @@ -486,17 +503,17 @@ gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) { GstRtpSsrcDemuxPad *dpad; - INTERNAL_STREAM_LOCK (demux); + GST_OBJECT_LOCK (demux); dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL) { - INTERNAL_STREAM_UNLOCK (demux); + GST_OBJECT_UNLOCK (demux); goto unknown_pad; } GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc); demux->srcpads = g_slist_remove (demux->srcpads, dpad); - INTERNAL_STREAM_UNLOCK (demux); + GST_OBJECT_UNLOCK (demux); gst_pad_set_active (dpad->rtp_pad, FALSE); gst_pad_set_active (dpad->rtcp_pad, FALSE); @@ -535,7 +552,7 @@ forward_event (GstPad * pad, gpointer user_data) GSList *walk = NULL; GstEvent *newevent = NULL; - INTERNAL_STREAM_LOCK (fdata->demux); + GST_OBJECT_LOCK (fdata->demux); for (walk = fdata->demux->srcpads; walk; walk = walk->next) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data; @@ -544,7 +561,7 @@ forward_event (GstPad * pad, gpointer user_data) break; } } - INTERNAL_STREAM_UNLOCK (fdata->demux); + GST_OBJECT_UNLOCK (fdata->demux); if (newevent) fdata->res &= gst_pad_push_event (pad, newevent); @@ -582,7 +599,6 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) guint32 ssrc; GstRTPBuffer rtp = { NULL }; GstPad *srcpad; - GstRtpSsrcDemuxPad *dpad; demux = GST_RTP_SSRC_DEMUX (parent); @@ -602,14 +618,17 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) ret = gst_pad_push (srcpad, buf); if (ret != GST_FLOW_OK) { + GstPad *active_pad; + /* check if the ssrc still there, may have been removed */ - INTERNAL_STREAM_LOCK (demux); - dpad = find_demux_pad_for_ssrc (demux, ssrc); - if (dpad == NULL || dpad->rtp_pad != srcpad) { + active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTP_PAD); + + if (active_pad == NULL || active_pad != srcpad) { /* SSRC was removed during the push ... ignore the error */ ret = GST_FLOW_OK; } - INTERNAL_STREAM_UNLOCK (demux); + + g_clear_object (&active_pad); } gst_object_unref (srcpad); @@ -644,7 +663,6 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, GstRTCPPacket packet; GstRTCPBuffer rtcp = { NULL, }; GstPad *srcpad; - GstRtpSsrcDemuxPad *dpad; demux = GST_RTP_SSRC_DEMUX (parent); @@ -689,14 +707,16 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, ret = gst_pad_push (srcpad, buf); if (ret != GST_FLOW_OK) { + GstPad *active_pad; + /* check if the ssrc still there, may have been removed */ - INTERNAL_STREAM_LOCK (demux); - dpad = find_demux_pad_for_ssrc (demux, ssrc); - if (dpad == NULL || dpad->rtcp_pad != srcpad) { + active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD); + if (active_pad == NULL || active_pad != srcpad) { /* SSRC was removed during the push ... ignore the error */ ret = GST_FLOW_OK; } - INTERNAL_STREAM_UNLOCK (demux); + + g_clear_object (&active_pad); } gst_object_unref (srcpad); @@ -786,7 +806,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent) demux = GST_RTP_SSRC_DEMUX (parent); - INTERNAL_STREAM_LOCK (demux); + GST_OBJECT_LOCK (demux); for (current = demux->srcpads; current; current = g_slist_next (current)) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data; @@ -807,7 +827,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent) g_value_unset (&val); } - INTERNAL_STREAM_UNLOCK (demux); + GST_OBJECT_UNLOCK (demux); return it; } diff --git a/tests/check/elements/rtpssrcdemux.c b/tests/check/elements/rtpssrcdemux.c index a8b093b..179d68e 100644 --- a/tests/check/elements/rtpssrcdemux.c +++ b/tests/check/elements/rtpssrcdemux.c @@ -174,6 +174,72 @@ GST_START_TEST (test_event_forwarding) GST_END_TEST; +typedef struct +{ + gint ready; + GMutex mutex; + GCond cond; +} LockTestContext; + +static void +new_ssrc_pad_cb (GstElement * element, guint ssrc, GstPad * pad, + LockTestContext * ctx) +{ + g_message ("Signalling ready"); + g_atomic_int_set (&ctx->ready, 1); + + g_message ("Waiting no more ready"); + while (g_atomic_int_get (&ctx->ready)) + g_usleep (G_USEC_PER_SEC / 100); + + g_mutex_lock (&ctx->mutex); + g_mutex_unlock (&ctx->mutex); +} + +static gpointer +push_buffer_func (gpointer user_data) +{ + GstHarness *h = user_data; + gst_harness_push (h, create_buffer (0, 0xdeadbeef)); + return NULL; +} + +GST_START_TEST (test_oob_event_locking) +{ + GstHarness *h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL); + LockTestContext ctx = { FALSE, }; + GThread *thread; + + g_mutex_init (&ctx.mutex); + g_cond_init (&ctx.cond); + + gst_harness_set_src_caps_str (h, "application/x-rtp"); + g_signal_connect (h->element, + "new-ssrc-pad", G_CALLBACK (new_ssrc_pad_cb), &ctx); + + thread = g_thread_new ("streaming-thread", push_buffer_func, h); + + g_mutex_lock (&ctx.mutex); + + g_message ("Waiting for ready"); + while (!g_atomic_int_get (&ctx.ready)) + g_usleep (G_USEC_PER_SEC / 100); + g_message ("Signal no more ready"); + g_atomic_int_set (&ctx.ready, 0); + + gst_harness_push_event (h, + gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB, NULL)); + + g_mutex_unlock (&ctx.mutex); + + g_thread_join (thread); + g_mutex_clear (&ctx.mutex); + g_cond_clear (&ctx.cond); + gst_harness_teardown (h); +} + +GST_END_TEST; + static Suite * rtpssrcdemux_suite (void) { @@ -182,6 +248,7 @@ rtpssrcdemux_suite (void) suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_event_forwarding); + tcase_add_test (tc_chain, test_oob_event_locking); return s; } -- 2.7.4