rtpbin: separate out the two fec decoder locations
authorMatthew Waters <matthew@centricular.com>
Tue, 9 Nov 2021 04:10:06 +0000 (15:10 +1100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 10 Nov 2021 10:38:26 +0000 (10:38 +0000)
The pipeline flow for receiving looks like this:

rtpsession ! rtpssrcdemux ! session_fec_decoder ! rtpjitterbuffer ! \
  rtpptdemux ! stream_fec_decoder ! ...

There are two places where a fec decoder could be placed.
1. As requested from the 'request-fec-decoder' signal: after rtpptdemux
   for each ssrc/pt produced
2. after rtpssrcdemux but before rtpjitterbuffer: added for the
   rtpst2022-1-fecenc/dec elements,

However, there was some cross-contamination of the elements involved and
the request-fec-decoder signal was also being used to request the fec
decoder for the session_fec_decoder which would then be cached and
re-used for subsequent fec decoder requests.  This would cause the same
element to be attempted to be linked to multiple elements in different
places in the pipeline.  This would fail and cause all kinds of havoc
usually resulting in a not-linked error being returned upstream and an
error message being posted by the source.

Fix by not using the request-fec-decoder signal for requesting the
session_fec_decoder and instead solely rely on the added properties for
that case.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1300>

subprojects/gst-plugins-good/gst/rtpmanager/gstrtpbin.c

index 01e8dff..1ea02f7 100644 (file)
@@ -528,7 +528,10 @@ struct _GstRtpBinSession
 
   GSList *recv_fec_sinks;
   GSList *recv_fec_sink_ghosts;
-  GstElement *fec_decoder;
+  /* fec decoder placed before the rtpjitterbuffer but after the rtpssrcdemux.
+   * XXX: This does not yet support multiple ssrc's in the same rtp session
+   */
+  GstElement *early_fec_decoder;
 
   GSList *send_fec_src_ghosts;
 };
@@ -2488,7 +2491,9 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
    * @session: the session index
    *
    * Request a FEC decoder element for the given @session. The element
-   * will be added to the bin after the pt demuxer.
+   * will be added to the bin after the pt demuxer.  If there are multiple
+   * ssrc's and pt's in @session, this signal may be called multiple times for
+   * the same @session each corresponding to a newly discovered ssrc.
    *
    * If no handler is connected, no FEC decoder will be used.
    *
@@ -3558,12 +3563,12 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
 }
 
 static gboolean
-ensure_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
+ensure_early_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
 {
   const gchar *factory;
   gchar *sess_id_str;
 
-  if (session->fec_decoder)
+  if (session->early_fec_decoder)
     goto done;
 
   sess_id_str = g_strdup_printf ("%u", session->id);
@@ -3574,29 +3579,27 @@ ensure_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
   if (factory) {
     GError *err = NULL;
 
-    session->fec_decoder =
+    session->early_fec_decoder =
         gst_parse_bin_from_description_full (factory, TRUE, NULL,
         GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
         &err);
-    if (!session->fec_decoder) {
+    if (!session->early_fec_decoder) {
       GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s",
           err->message);
     }
 
-    bin_manage_element (session->bin, session->fec_decoder);
+    bin_manage_element (session->bin, session->early_fec_decoder);
     session->elements =
-        g_slist_prepend (session->elements, session->fec_decoder);
+        g_slist_prepend (session->elements, session->early_fec_decoder);
     GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT
-        " for session %u", session->fec_decoder, session->id);
+        " for session %u", session->early_fec_decoder, session->id);
   }
 
-  /* Fallback to the signal */
-  if (!session->fec_decoder)
-    session->fec_decoder =
-        session_request_element (session, SIGNAL_REQUEST_FEC_DECODER);
+  /* Do not fallback to the signal as the signal expects a fec decoder to
+   * be placed at a different place in the pipeline */
 
 done:
-  return session->fec_decoder != NULL;
+  return session->early_fec_decoder != NULL;
 }
 
 static void
@@ -3610,9 +3613,11 @@ expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream,
 
   gst_object_ref (pad);
 
-  if (stream->session->storage && !stream->session->fec_decoder) {
-    if (ensure_fec_decoder (rtpbin, stream->session)) {
-      GstElement *fec_decoder = stream->session->fec_decoder;
+  if (stream->session->storage) {
+    GstElement *fec_decoder =
+        session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER);
+
+    if (fec_decoder) {
       GstPad *sinkpad, *srcpad;
       GstPadLinkReturn ret;
 
@@ -3849,12 +3854,13 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
   srcpad = gst_element_get_static_pad (element, padname);
   g_free (padname);
 
-  if (session->fec_decoder) {
-    sinkpad = gst_element_get_static_pad (session->fec_decoder, "sink");
+  if (session->early_fec_decoder) {
+    GST_DEBUG_OBJECT (rtpbin, "linking fec decoder");
+    sinkpad = gst_element_get_static_pad (session->early_fec_decoder, "sink");
     gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
     gst_object_unref (sinkpad);
     gst_object_unref (srcpad);
-    srcpad = gst_element_get_static_pad (session->fec_decoder, "src");
+    srcpad = gst_element_get_static_pad (session->early_fec_decoder, "src");
   }
 
   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
@@ -4203,12 +4209,12 @@ complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session,
   gchar *padname;
   GstPad *ret;
 
-  if (!ensure_fec_decoder (rtpbin, session))
+  if (!ensure_early_fec_decoder (rtpbin, session))
     goto no_decoder;
 
   GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad");
   padname = g_strdup_printf ("fec_%u", fec_idx);
-  ret = gst_element_request_pad_simple (session->fec_decoder, padname);
+  ret = gst_element_request_pad_simple (session->early_fec_decoder, padname);
   g_free (padname);
 
   if (ret == NULL)
@@ -4467,7 +4473,7 @@ remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session,
   if (target) {
     item = g_slist_find (session->recv_fec_sinks, target);
     if (item) {
-      gst_element_release_request_pad (session->fec_decoder, item->data);
+      gst_element_release_request_pad (session->early_fec_decoder, item->data);
       session->recv_fec_sinks =
           g_slist_delete_link (session->recv_fec_sinks, item);
     }