rtpssrcdemux: Avoid taking streamlock out-of-band
authorNicolas Dufresne <nicolas.dufresne@collabora.com>
Tue, 21 May 2019 19:25:03 +0000 (15:25 -0400)
committerTim-Philipp Müller <tim@centricular.com>
Thu, 8 Aug 2019 08:50:53 +0000 (09:50 +0100)
In this change we now protect the internal srcpads list using the
stream lock and limit usage of the internal stream lock to
preventing data flowing on the other src pad type while creating
and signalling the new pad.

This fixes a deadlock with RTPBin shutdown lock. These two locks would
end up being taken in two different order, which caused a deadlock. More
generally, we should not rely on a streamlock when handling out-of-band
data, so as a side effect, we should not take a stream lock when
iterating internal links.

gst/rtpmanager/gstrtpssrcdemux.c
tests/check/elements/rtpssrcdemux.c

index 1661e79..880e8f2 100644 (file)
@@ -145,6 +145,7 @@ struct _GstRtpSsrcDemuxPad
 };
 
 /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
+ * MUST be called with object lock
  */
 static GstRtpSsrcDemuxPad *
 find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
@@ -160,6 +161,38 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   return NULL;
 }
 
+/* returns a reference to the pad if found, %NULL otherwise */
+static GstPad *
+get_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype)
+{
+  GstRtpSsrcDemuxPad *demuxpad;
+  GstPad *retpad;
+
+  GST_OBJECT_LOCK (demux);
+
+  demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
+  if (!demuxpad) {
+    GST_OBJECT_UNLOCK (demux);
+    return NULL;
+  }
+
+  switch (padtype) {
+    case RTP_PAD:
+      retpad = gst_object_ref (demuxpad->rtp_pad);
+      break;
+    case RTCP_PAD:
+      retpad = gst_object_ref (demuxpad->rtcp_pad);
+      break;
+    default:
+      retpad = NULL;
+      g_assert_not_reached ();
+  }
+
+  GST_OBJECT_UNLOCK (demux);
+
+  return retpad;
+}
+
 static GstEvent *
 add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
 {
@@ -229,6 +262,7 @@ forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
   gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata);
 }
 
+/* MUST only be called from streaming thread */
 static GstPad *
 find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
     PadType padtype)
@@ -242,22 +276,9 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
 
   INTERNAL_STREAM_LOCK (demux);
 
-  demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
-  if (demuxpad != NULL) {
-    switch (padtype) {
-      case RTP_PAD:
-        retpad = gst_object_ref (demuxpad->rtp_pad);
-        break;
-      case RTCP_PAD:
-        retpad = gst_object_ref (demuxpad->rtcp_pad);
-        break;
-      default:
-        retpad = NULL;
-        g_assert_not_reached ();
-    }
-
+  retpad = get_demux_pad_for_ssrc (demux, ssrc, padtype);
+  if (retpad != NULL) {
     INTERNAL_STREAM_UNLOCK (demux);
-
     return retpad;
   }
 
@@ -283,7 +304,9 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
   gst_pad_set_element_private (rtp_pad, demuxpad);
   gst_pad_set_element_private (rtcp_pad, demuxpad);
 
+  GST_OBJECT_LOCK (demux);
   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
+  GST_OBJECT_UNLOCK (demux);
 
   gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
   gst_pad_set_iterate_internal_links_function (rtp_pad,
@@ -316,17 +339,11 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
       g_assert_not_reached ();
   }
 
-  gst_object_ref (rtp_pad);
-  gst_object_ref (rtcp_pad);
-
   g_signal_emit (G_OBJECT (demux),
       gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
 
   INTERNAL_STREAM_UNLOCK (demux);
 
-  gst_object_unref (rtp_pad);
-  gst_object_unref (rtcp_pad);
-
   return retpad;
 }
 
@@ -486,17 +503,17 @@ gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
 {
   GstRtpSsrcDemuxPad *dpad;
 
-  INTERNAL_STREAM_LOCK (demux);
+  GST_OBJECT_LOCK (demux);
   dpad = find_demux_pad_for_ssrc (demux, ssrc);
   if (dpad == NULL) {
-    INTERNAL_STREAM_UNLOCK (demux);
+    GST_OBJECT_UNLOCK (demux);
     goto unknown_pad;
   }
 
   GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc);
 
   demux->srcpads = g_slist_remove (demux->srcpads, dpad);
-  INTERNAL_STREAM_UNLOCK (demux);
+  GST_OBJECT_UNLOCK (demux);
 
   gst_pad_set_active (dpad->rtp_pad, FALSE);
   gst_pad_set_active (dpad->rtcp_pad, FALSE);
@@ -535,7 +552,7 @@ forward_event (GstPad * pad, gpointer user_data)
   GSList *walk = NULL;
   GstEvent *newevent = NULL;
 
-  INTERNAL_STREAM_LOCK (fdata->demux);
+  GST_OBJECT_LOCK (fdata->demux);
   for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
 
@@ -544,7 +561,7 @@ forward_event (GstPad * pad, gpointer user_data)
       break;
     }
   }
-  INTERNAL_STREAM_UNLOCK (fdata->demux);
+  GST_OBJECT_UNLOCK (fdata->demux);
 
   if (newevent)
     fdata->res &= gst_pad_push_event (pad, newevent);
@@ -582,7 +599,6 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
   guint32 ssrc;
   GstRTPBuffer rtp = { NULL };
   GstPad *srcpad;
-  GstRtpSsrcDemuxPad *dpad;
 
   demux = GST_RTP_SSRC_DEMUX (parent);
 
@@ -602,14 +618,17 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
   ret = gst_pad_push (srcpad, buf);
 
   if (ret != GST_FLOW_OK) {
+    GstPad *active_pad;
+
     /* check if the ssrc still there, may have been removed */
-    INTERNAL_STREAM_LOCK (demux);
-    dpad = find_demux_pad_for_ssrc (demux, ssrc);
-    if (dpad == NULL || dpad->rtp_pad != srcpad) {
+    active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
+
+    if (active_pad == NULL || active_pad != srcpad) {
       /* SSRC was removed during the push ... ignore the error */
       ret = GST_FLOW_OK;
     }
-    INTERNAL_STREAM_UNLOCK (demux);
+
+    g_clear_object (&active_pad);
   }
 
   gst_object_unref (srcpad);
@@ -644,7 +663,6 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
   GstRTCPPacket packet;
   GstRTCPBuffer rtcp = { NULL, };
   GstPad *srcpad;
-  GstRtpSsrcDemuxPad *dpad;
 
   demux = GST_RTP_SSRC_DEMUX (parent);
 
@@ -689,14 +707,16 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
   ret = gst_pad_push (srcpad, buf);
 
   if (ret != GST_FLOW_OK) {
+    GstPad *active_pad;
+
     /* check if the ssrc still there, may have been removed */
-    INTERNAL_STREAM_LOCK (demux);
-    dpad = find_demux_pad_for_ssrc (demux, ssrc);
-    if (dpad == NULL || dpad->rtcp_pad != srcpad) {
+    active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
+    if (active_pad == NULL || active_pad != srcpad) {
       /* SSRC was removed during the push ... ignore the error */
       ret = GST_FLOW_OK;
     }
-    INTERNAL_STREAM_UNLOCK (demux);
+
+    g_clear_object (&active_pad);
   }
 
   gst_object_unref (srcpad);
@@ -786,7 +806,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
 
   demux = GST_RTP_SSRC_DEMUX (parent);
 
-  INTERNAL_STREAM_LOCK (demux);
+  GST_OBJECT_LOCK (demux);
   for (current = demux->srcpads; current; current = g_slist_next (current)) {
     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data;
 
@@ -807,7 +827,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
     g_value_unset (&val);
 
   }
-  INTERNAL_STREAM_UNLOCK (demux);
+  GST_OBJECT_UNLOCK (demux);
 
   return it;
 }
index a8b093b..179d68e 100644 (file)
@@ -174,6 +174,72 @@ GST_START_TEST (test_event_forwarding)
 
 GST_END_TEST;
 
+typedef struct
+{
+  gint ready;
+  GMutex mutex;
+  GCond cond;
+} LockTestContext;
+
+static void
+new_ssrc_pad_cb (GstElement * element, guint ssrc, GstPad * pad,
+    LockTestContext * ctx)
+{
+  g_message ("Signalling ready");
+  g_atomic_int_set (&ctx->ready, 1);
+
+  g_message ("Waiting no more ready");
+  while (g_atomic_int_get (&ctx->ready))
+    g_usleep (G_USEC_PER_SEC / 100);
+
+  g_mutex_lock (&ctx->mutex);
+  g_mutex_unlock (&ctx->mutex);
+}
+
+static gpointer
+push_buffer_func (gpointer user_data)
+{
+  GstHarness *h = user_data;
+  gst_harness_push (h, create_buffer (0, 0xdeadbeef));
+  return NULL;
+}
+
+GST_START_TEST (test_oob_event_locking)
+{
+  GstHarness *h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL);
+  LockTestContext ctx = { FALSE, };
+  GThread *thread;
+
+  g_mutex_init (&ctx.mutex);
+  g_cond_init (&ctx.cond);
+
+  gst_harness_set_src_caps_str (h, "application/x-rtp");
+  g_signal_connect (h->element,
+      "new-ssrc-pad", G_CALLBACK (new_ssrc_pad_cb), &ctx);
+
+  thread = g_thread_new ("streaming-thread", push_buffer_func, h);
+
+  g_mutex_lock (&ctx.mutex);
+
+  g_message ("Waiting for ready");
+  while (!g_atomic_int_get (&ctx.ready))
+    g_usleep (G_USEC_PER_SEC / 100);
+  g_message ("Signal no more ready");
+  g_atomic_int_set (&ctx.ready, 0);
+
+  gst_harness_push_event (h,
+      gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB, NULL));
+
+  g_mutex_unlock (&ctx.mutex);
+
+  g_thread_join (thread);
+  g_mutex_clear (&ctx.mutex);
+  g_cond_clear (&ctx.cond);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
 static Suite *
 rtpssrcdemux_suite (void)
 {
@@ -182,6 +248,7 @@ rtpssrcdemux_suite (void)
 
   suite_add_tcase (s, tc_chain);
   tcase_add_test (tc_chain, test_event_forwarding);
+  tcase_add_test (tc_chain, test_oob_event_locking);
 
   return s;
 }