rtsp-stream: Only create RTP sending/receiving rtpbin pads if needed
authorSebastian Dröge <sebastian@centricular.com>
Thu, 19 Nov 2015 13:01:16 +0000 (15:01 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Tue, 1 Dec 2015 13:32:45 +0000 (15:32 +0200)
Adding them when not needed will start some logic inside rtpbin that might be
problematic. Also if e.g. for a sender media we suddenly receive RTP data, we
would start up a rtpjitterbuffer and behave in weird ways.

We still set up the UDP sources for RTP receiving for a sender media to be
able to receive any packets sent by the client for NAT traversal. They will
all go to a fakesink though.

Having an rtpjitterbuffer in the media pipeline will cause the pipeline to be
NO_PREROLL, which will cause deadlocks when seeking the media as it will never
receive ASYNC_DONE after a seek.

https://bugzilla.gnome.org/show_bug.cgi?id=758319

gst/rtsp-server/rtsp-stream.c

index 3ea6a82..399f33e 100644 (file)
@@ -2117,32 +2117,33 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
         (GCallback) request_pt_map, stream);
   }
 
-  /* get a pad for sending RTP */
-  name = g_strdup_printf ("send_rtp_sink_%u", idx);
-  priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
-  g_free (name);
-
+  /* get pads from the RTP session element for sending and receiving
+   * RTP/RTCP*/
   if (priv->srcpad) {
+    /* get a pad for sending RTP */
+    name = g_strdup_printf ("send_rtp_sink_%u", idx);
+    priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
+    g_free (name);
+
     /* link the RTP pad to the session manager, it should not really fail unless
      * this is not really an RTP pad */
     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
     if (ret != GST_PAD_LINK_OK)
       goto link_failed;
+
+    name = g_strdup_printf ("send_rtp_src_%u", idx);
+    priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
+    g_free (name);
   } else {
     /* Need to connect our sinkpad from here */
     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
     /* EOS */
     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
-  }
 
-  /* get pads from the RTP session element for sending and receiving
-   * RTP/RTCP*/
-  name = g_strdup_printf ("send_rtp_src_%u", idx);
-  priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
-  g_free (name);
-  name = g_strdup_printf ("recv_rtp_sink_%u", idx);
-  priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
-  g_free (name);
+    name = g_strdup_printf ("recv_rtp_sink_%u", idx);
+    priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
+    g_free (name);
+  }
 
   name = g_strdup_printf ("send_rtcp_src_%u", idx);
   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
@@ -2193,157 +2194,170 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
      * When only UDP is allowed, we skip the tee, queue and appsink and link the
      * udpsink directly to the session.
      */
-    /* add udpsink */
-    gst_bin_add (bin, priv->udpsink[i]);
-    sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
-
-    if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
-      /* make tee for RTP/RTCP */
-      priv->tee[i] = gst_element_factory_make ("tee", NULL);
-      gst_bin_add (bin, priv->tee[i]);
-
-      /* and link to rtpbin send pad */
-      pad = gst_element_get_static_pad (priv->tee[i], "sink");
-      gst_pad_link (priv->send_src[i], pad);
-      gst_object_unref (pad);
-
-      priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
-      g_object_set (priv->udpqueue[i], "max-size-buffers",
-          1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
-      gst_bin_add (bin, priv->udpqueue[i]);
-      /* link tee to udpqueue */
-      teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
-      pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
-      gst_pad_link (teepad, pad);
-      gst_object_unref (pad);
-      gst_object_unref (teepad);
-
-      /* link udpqueue to udpsink */
-      queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
-      gst_pad_link (queuepad, sinkpad);
-      gst_object_unref (queuepad);
-
-      /* make queue */
-      priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
-      g_object_set (priv->appqueue[i], "max-size-buffers",
-          1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
-      gst_bin_add (bin, priv->appqueue[i]);
-      /* and link to tee */
-      teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
-      pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
-      gst_pad_link (teepad, pad);
-      gst_object_unref (pad);
-      gst_object_unref (teepad);
-
-      /* make appsink */
-      priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
-      g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
-      g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
-      gst_bin_add (bin, priv->appsink[i]);
-      gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
-          &sink_cb, stream, NULL);
-      /* and link to queue */
-      queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
-      pad = gst_element_get_static_pad (priv->appsink[i], "sink");
-      gst_pad_link (queuepad, pad);
-      gst_object_unref (pad);
-      gst_object_unref (queuepad);
-    } else {
-      /* else only udpsink needed, link it to the session */
-      gst_pad_link (priv->send_src[i], sinkpad);
-    }
-    gst_object_unref (sinkpad);
 
-    /* For the receiver we create this bit of pipeline for both
-     * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
-     * and it is all funneled into the rtpbin receive pad.
-     *
-     * .--------.     .--------.    .--------.
-     * | udpsrc |     | funnel |    | rtpbin |
-     * |       src->sink      src->sink      |
-     * '--------'     |        |    '--------'
-     * .--------.     |        |
-     * | appsrc |     |        |
-     * |       src->sink       |
-     * '--------'     '--------'
-     */
-    /* make funnel for the RTP/RTCP receivers */
-    priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
-    gst_bin_add (bin, priv->funnel[i]);
-
-    pad = gst_element_get_static_pad (priv->funnel[i], "src");
-    gst_pad_link (pad, priv->recv_sink[i]);
-    gst_object_unref (pad);
-
-    if (priv->udpsrc_v4[i]) {
-      if (priv->srcpad) {
-        /* we set and keep these to playing so that they don't cause NO_PREROLL return
-         * values. This is only relevant for PLAY pipelines */
-        gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
-        gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
+    /* Only link the RTP send src if we're going to send RTP, link
+     * the RTCP send src always */
+    if (priv->srcpad || i == 1) {
+      /* add udpsink */
+      gst_bin_add (bin, priv->udpsink[i]);
+      sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
+
+      if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
+        /* make tee for RTP/RTCP */
+        priv->tee[i] = gst_element_factory_make ("tee", NULL);
+        gst_bin_add (bin, priv->tee[i]);
+
+        /* and link to rtpbin send pad */
+        pad = gst_element_get_static_pad (priv->tee[i], "sink");
+        gst_pad_link (priv->send_src[i], pad);
+        gst_object_unref (pad);
+
+        priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
+        g_object_set (priv->udpqueue[i], "max-size-buffers",
+            1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
+            NULL);
+        gst_bin_add (bin, priv->udpqueue[i]);
+        /* link tee to udpqueue */
+        teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
+        pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
+        gst_pad_link (teepad, pad);
+        gst_object_unref (pad);
+        gst_object_unref (teepad);
+
+        /* link udpqueue to udpsink */
+        queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
+        gst_pad_link (queuepad, sinkpad);
+        gst_object_unref (queuepad);
+
+        /* make queue */
+        priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
+        g_object_set (priv->appqueue[i], "max-size-buffers",
+            1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
+            NULL);
+        gst_bin_add (bin, priv->appqueue[i]);
+        /* and link to tee */
+        teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
+        pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
+        gst_pad_link (teepad, pad);
+        gst_object_unref (pad);
+        gst_object_unref (teepad);
+
+        /* make appsink */
+        priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
+        g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
+        g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
+        gst_bin_add (bin, priv->appsink[i]);
+        gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
+            &sink_cb, stream, NULL);
+        /* and link to queue */
+        queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
+        pad = gst_element_get_static_pad (priv->appsink[i], "sink");
+        gst_pad_link (queuepad, pad);
+        gst_object_unref (pad);
+        gst_object_unref (queuepad);
+      } else {
+        /* else only udpsink needed, link it to the session */
+        gst_pad_link (priv->send_src[i], sinkpad);
       }
-      /* add udpsrc */
-      gst_bin_add (bin, priv->udpsrc_v4[i]);
+      gst_object_unref (sinkpad);
+    }
 
-      /* and link to the funnel v4 */
-      selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
-      pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
-      gst_pad_link (pad, selpad);
+    /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
+     * RTCP sink always */
+    if (priv->sinkpad || i == 1) {
+      /* For the receiver we create this bit of pipeline for both
+       * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
+       * and it is all funneled into the rtpbin receive pad.
+       *
+       * .--------.     .--------.    .--------.
+       * | udpsrc |     | funnel |    | rtpbin |
+       * |       src->sink      src->sink      |
+       * '--------'     |        |    '--------'
+       * .--------.     |        |
+       * | appsrc |     |        |
+       * |       src->sink       |
+       * '--------'     '--------'
+       */
+      /* make funnel for the RTP/RTCP receivers */
+      priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
+      gst_bin_add (bin, priv->funnel[i]);
+
+      pad = gst_element_get_static_pad (priv->funnel[i], "src");
+      gst_pad_link (pad, priv->recv_sink[i]);
       gst_object_unref (pad);
-      gst_object_unref (selpad);
-    }
 
-    if (priv->udpsrc_v6[i]) {
-      if (priv->srcpad) {
-        gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
-        gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
+      if (priv->udpsrc_v4[i]) {
+        if (priv->srcpad) {
+          /* we set and keep these to playing so that they don't cause NO_PREROLL return
+           * values. This is only relevant for PLAY pipelines */
+          gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
+          gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
+        }
+        /* add udpsrc */
+        gst_bin_add (bin, priv->udpsrc_v4[i]);
+
+        /* and link to the funnel v4 */
+        selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
+        pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
+        gst_pad_link (pad, selpad);
+        gst_object_unref (pad);
+        gst_object_unref (selpad);
       }
-      gst_bin_add (bin, priv->udpsrc_v6[i]);
 
-      /* and link to the funnel v6 */
-      selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
-      pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
-      gst_pad_link (pad, selpad);
-      gst_object_unref (pad);
-      gst_object_unref (selpad);
-    }
+      if (priv->udpsrc_v6[i]) {
+        if (priv->srcpad) {
+          gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
+          gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
+        }
+        gst_bin_add (bin, priv->udpsrc_v6[i]);
+
+        /* and link to the funnel v6 */
+        selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
+        pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
+        gst_pad_link (pad, selpad);
+        gst_object_unref (pad);
+        gst_object_unref (selpad);
+      }
 
-    if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
-      /* make and add appsrc */
-      priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
-      priv->appsrc_base_time[i] = -1;
-      g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
-      gst_bin_add (bin, priv->appsrc[i]);
-      /* and link to the funnel */
-      selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
-      pad = gst_element_get_static_pad (priv->appsrc[i], "src");
-      gst_pad_link (pad, selpad);
-      gst_object_unref (pad);
-      gst_object_unref (selpad);
+      if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
+        /* make and add appsrc */
+        priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
+        priv->appsrc_base_time[i] = -1;
+        g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
+        gst_bin_add (bin, priv->appsrc[i]);
+        /* and link to the funnel */
+        selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
+        pad = gst_element_get_static_pad (priv->appsrc[i], "src");
+        gst_pad_link (pad, selpad);
+        gst_object_unref (pad);
+        gst_object_unref (selpad);
+      }
     }
 
     /* check if we need to set to a special state */
     if (state != GST_STATE_NULL) {
-      if (priv->udpsink[i])
+      if (priv->udpsink[i] && (priv->srcpad || i == 1))
         gst_element_set_state (priv->udpsink[i], state);
-      if (priv->appsink[i])
+      if (priv->appsink[i] && (priv->srcpad || i == 1))
         gst_element_set_state (priv->appsink[i], state);
-      if (priv->appqueue[i])
+      if (priv->appqueue[i] && (priv->srcpad || i == 1))
         gst_element_set_state (priv->appqueue[i], state);
-      if (priv->udpqueue[i])
+      if (priv->udpqueue[i] && (priv->srcpad || i == 1))
         gst_element_set_state (priv->udpqueue[i], state);
-      if (priv->tee[i])
+      if (priv->tee[i] && (priv->srcpad || i == 1))
         gst_element_set_state (priv->tee[i], state);
-      if (priv->funnel[i])
+      if (priv->funnel[i] && (priv->sinkpad || i == 1))
         gst_element_set_state (priv->funnel[i], state);
-      if (priv->appsrc[i])
+      if (priv->appsrc[i] && (priv->sinkpad || i == 1))
         gst_element_set_state (priv->appsrc[i], state);
     }
   }
 
-  /* be notified of caps changes */
-  priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
-      (GCallback) caps_notify, stream);
+  if (priv->srcpad) {
+    /* be notified of caps changes */
+    priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
+        (GCallback) caps_notify, stream);
+  }
 
   priv->is_joined = TRUE;
   g_mutex_unlock (&priv->lock);
@@ -2411,15 +2425,16 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
 
   if (priv->srcpad) {
     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
+
+    g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
+    gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
+    gst_object_unref (priv->send_rtp_sink);
+    priv->send_rtp_sink = NULL;
   } else if (priv->recv_rtp_src) {
     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
     gst_object_unref (priv->recv_rtp_src);
     priv->recv_rtp_src = NULL;
   }
-  g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
-  gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
-  gst_object_unref (priv->send_rtp_sink);
-  priv->send_rtp_sink = NULL;
 
   for (i = 0; i < 2; i++) {
     if (priv->udpsink[i])
@@ -2436,7 +2451,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
     if (priv->appsrc[i])
       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
-    if (priv->udpsrc_v4[i]) {
+    if (priv->udpsrc_v4[i] && (priv->sinkpad || i == 1)) {
       /* and set udpsrc to NULL now before removing */
       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
@@ -2444,7 +2459,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
        * pads when they finalize */
       gst_bin_remove (bin, priv->udpsrc_v4[i]);
     }
-    if (priv->udpsrc_v6[i]) {
+    if (priv->udpsrc_v6[i] && (priv->sinkpad || i == 1)) {
       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
       gst_bin_remove (bin, priv->udpsrc_v6[i]);
@@ -2461,24 +2476,26 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
       gst_bin_remove (bin, s->udpsrc[i]);
     }
 
-    if (priv->udpsink[i])
+    if (priv->udpsink[i] && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->udpsink[i]);
-    if (priv->appsrc[i])
+    if (priv->appsrc[i] && (priv->sinkpad || i == 1))
       gst_bin_remove (bin, priv->appsrc[i]);
-    if (priv->appsink[i])
+    if (priv->appsink[i] && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->appsink[i]);
-    if (priv->appqueue[i])
+    if (priv->appqueue[i] && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->appqueue[i]);
-    if (priv->udpqueue[i])
+    if (priv->udpqueue[i] && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->udpqueue[i]);
-    if (priv->tee[i])
+    if (priv->tee[i] && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->tee[i]);
-    if (priv->funnel[i])
+    if (priv->funnel[i] && (priv->sinkpad || i == 1))
       gst_bin_remove (bin, priv->funnel[i]);
 
-    gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
-    gst_object_unref (priv->recv_sink[i]);
-    priv->recv_sink[i] = NULL;
+    if (priv->sinkpad || i == 1) {
+      gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
+      gst_object_unref (priv->recv_sink[i]);
+      priv->recv_sink[i] = NULL;
+    }
 
     priv->udpsrc_v4[i] = NULL;
     priv->udpsrc_v6[i] = NULL;
@@ -2498,8 +2515,10 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
   g_list_free (priv->transport_sources);
   priv->transport_sources = NULL;
 
-  gst_object_unref (priv->send_src[0]);
-  priv->send_src[0] = NULL;
+  if (priv->srcpad) {
+    gst_object_unref (priv->send_src[0]);
+    priv->send_src[0] = NULL;
+  }
 
   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
   gst_object_unref (priv->send_src[1]);