From 61772cb32644a077750abffcd17c43b01fb93cfa Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 19 Nov 2015 15:01:16 +0200 Subject: [PATCH] rtsp-stream: Only create RTP sending/receiving rtpbin pads if needed Adding them when not needed will start some logic inside rtpbin that might be problematic. Also if e.g. for a sender media we suddenly receive RTP data, we would start up a rtpjitterbuffer and behave in weird ways. We still set up the UDP sources for RTP receiving for a sender media to be able to receive any packets sent by the client for NAT traversal. They will all go to a fakesink though. Having an rtpjitterbuffer in the media pipeline will cause the pipeline to be NO_PREROLL, which will cause deadlocks when seeking the media as it will never receive ASYNC_DONE after a seek. https://bugzilla.gnome.org/show_bug.cgi?id=758319 --- gst/rtsp-server/rtsp-stream.c | 341 ++++++++++++++++++++++-------------------- 1 file changed, 180 insertions(+), 161 deletions(-) diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 3ea6a82..399f33e 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -2117,32 +2117,33 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, (GCallback) request_pt_map, stream); } - /* get a pad for sending RTP */ - name = g_strdup_printf ("send_rtp_sink_%u", idx); - priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name); - g_free (name); - + /* get pads from the RTP session element for sending and receiving + * RTP/RTCP*/ if (priv->srcpad) { + /* get a pad for sending RTP */ + name = g_strdup_printf ("send_rtp_sink_%u", idx); + priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name); + g_free (name); + /* link the RTP pad to the session manager, it should not really fail unless * this is not really an RTP pad */ ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink); if (ret != GST_PAD_LINK_OK) goto link_failed; + + name = g_strdup_printf ("send_rtp_src_%u", idx); + priv->send_src[0] = gst_element_get_static_pad (rtpbin, name); + g_free (name); } else { /* Need to connect our sinkpad from here */ g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream); /* EOS */ g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream); - } - /* get pads from the RTP session element for sending and receiving - * RTP/RTCP*/ - name = g_strdup_printf ("send_rtp_src_%u", idx); - priv->send_src[0] = gst_element_get_static_pad (rtpbin, name); - g_free (name); - name = g_strdup_printf ("recv_rtp_sink_%u", idx); - priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name); - g_free (name); + name = g_strdup_printf ("recv_rtp_sink_%u", idx); + priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name); + g_free (name); + } name = g_strdup_printf ("send_rtcp_src_%u", idx); priv->send_src[1] = gst_element_get_request_pad (rtpbin, name); @@ -2193,157 +2194,170 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, * When only UDP is allowed, we skip the tee, queue and appsink and link the * udpsink directly to the session. */ - /* add udpsink */ - gst_bin_add (bin, priv->udpsink[i]); - sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink"); - - if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) { - /* make tee for RTP/RTCP */ - priv->tee[i] = gst_element_factory_make ("tee", NULL); - gst_bin_add (bin, priv->tee[i]); - - /* and link to rtpbin send pad */ - pad = gst_element_get_static_pad (priv->tee[i], "sink"); - gst_pad_link (priv->send_src[i], pad); - gst_object_unref (pad); - - priv->udpqueue[i] = gst_element_factory_make ("queue", NULL); - g_object_set (priv->udpqueue[i], "max-size-buffers", - 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL); - gst_bin_add (bin, priv->udpqueue[i]); - /* link tee to udpqueue */ - teepad = gst_element_get_request_pad (priv->tee[i], "src_%u"); - pad = gst_element_get_static_pad (priv->udpqueue[i], "sink"); - gst_pad_link (teepad, pad); - gst_object_unref (pad); - gst_object_unref (teepad); - - /* link udpqueue to udpsink */ - queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src"); - gst_pad_link (queuepad, sinkpad); - gst_object_unref (queuepad); - - /* make queue */ - priv->appqueue[i] = gst_element_factory_make ("queue", NULL); - g_object_set (priv->appqueue[i], "max-size-buffers", - 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL); - gst_bin_add (bin, priv->appqueue[i]); - /* and link to tee */ - teepad = gst_element_get_request_pad (priv->tee[i], "src_%u"); - pad = gst_element_get_static_pad (priv->appqueue[i], "sink"); - gst_pad_link (teepad, pad); - gst_object_unref (pad); - gst_object_unref (teepad); - - /* make appsink */ - priv->appsink[i] = gst_element_factory_make ("appsink", NULL); - g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); - g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL); - gst_bin_add (bin, priv->appsink[i]); - gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]), - &sink_cb, stream, NULL); - /* and link to queue */ - queuepad = gst_element_get_static_pad (priv->appqueue[i], "src"); - pad = gst_element_get_static_pad (priv->appsink[i], "sink"); - gst_pad_link (queuepad, pad); - gst_object_unref (pad); - gst_object_unref (queuepad); - } else { - /* else only udpsink needed, link it to the session */ - gst_pad_link (priv->send_src[i], sinkpad); - } - gst_object_unref (sinkpad); - /* For the receiver we create this bit of pipeline for both - * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc - * and it is all funneled into the rtpbin receive pad. - * - * .--------. .--------. .--------. - * | udpsrc | | funnel | | rtpbin | - * | src->sink src->sink | - * '--------' | | '--------' - * .--------. | | - * | appsrc | | | - * | src->sink | - * '--------' '--------' - */ - /* make funnel for the RTP/RTCP receivers */ - priv->funnel[i] = gst_element_factory_make ("funnel", NULL); - gst_bin_add (bin, priv->funnel[i]); - - pad = gst_element_get_static_pad (priv->funnel[i], "src"); - gst_pad_link (pad, priv->recv_sink[i]); - gst_object_unref (pad); - - if (priv->udpsrc_v4[i]) { - if (priv->srcpad) { - /* we set and keep these to playing so that they don't cause NO_PREROLL return - * values. This is only relevant for PLAY pipelines */ - gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING); - gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE); + /* Only link the RTP send src if we're going to send RTP, link + * the RTCP send src always */ + if (priv->srcpad || i == 1) { + /* add udpsink */ + gst_bin_add (bin, priv->udpsink[i]); + sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink"); + + if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) { + /* make tee for RTP/RTCP */ + priv->tee[i] = gst_element_factory_make ("tee", NULL); + gst_bin_add (bin, priv->tee[i]); + + /* and link to rtpbin send pad */ + pad = gst_element_get_static_pad (priv->tee[i], "sink"); + gst_pad_link (priv->send_src[i], pad); + gst_object_unref (pad); + + priv->udpqueue[i] = gst_element_factory_make ("queue", NULL); + g_object_set (priv->udpqueue[i], "max-size-buffers", + 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), + NULL); + gst_bin_add (bin, priv->udpqueue[i]); + /* link tee to udpqueue */ + teepad = gst_element_get_request_pad (priv->tee[i], "src_%u"); + pad = gst_element_get_static_pad (priv->udpqueue[i], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + /* link udpqueue to udpsink */ + queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src"); + gst_pad_link (queuepad, sinkpad); + gst_object_unref (queuepad); + + /* make queue */ + priv->appqueue[i] = gst_element_factory_make ("queue", NULL); + g_object_set (priv->appqueue[i], "max-size-buffers", + 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), + NULL); + gst_bin_add (bin, priv->appqueue[i]); + /* and link to tee */ + teepad = gst_element_get_request_pad (priv->tee[i], "src_%u"); + pad = gst_element_get_static_pad (priv->appqueue[i], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + /* make appsink */ + priv->appsink[i] = gst_element_factory_make ("appsink", NULL); + g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); + g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL); + gst_bin_add (bin, priv->appsink[i]); + gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]), + &sink_cb, stream, NULL); + /* and link to queue */ + queuepad = gst_element_get_static_pad (priv->appqueue[i], "src"); + pad = gst_element_get_static_pad (priv->appsink[i], "sink"); + gst_pad_link (queuepad, pad); + gst_object_unref (pad); + gst_object_unref (queuepad); + } else { + /* else only udpsink needed, link it to the session */ + gst_pad_link (priv->send_src[i], sinkpad); } - /* add udpsrc */ - gst_bin_add (bin, priv->udpsrc_v4[i]); + gst_object_unref (sinkpad); + } - /* and link to the funnel v4 */ - selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); - pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src"); - gst_pad_link (pad, selpad); + /* Only connect recv RTP sink if we expect to receive RTP. Connect recv + * RTCP sink always */ + if (priv->sinkpad || i == 1) { + /* For the receiver we create this bit of pipeline for both + * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc + * and it is all funneled into the rtpbin receive pad. + * + * .--------. .--------. .--------. + * | udpsrc | | funnel | | rtpbin | + * | src->sink src->sink | + * '--------' | | '--------' + * .--------. | | + * | appsrc | | | + * | src->sink | + * '--------' '--------' + */ + /* make funnel for the RTP/RTCP receivers */ + priv->funnel[i] = gst_element_factory_make ("funnel", NULL); + gst_bin_add (bin, priv->funnel[i]); + + pad = gst_element_get_static_pad (priv->funnel[i], "src"); + gst_pad_link (pad, priv->recv_sink[i]); gst_object_unref (pad); - gst_object_unref (selpad); - } - if (priv->udpsrc_v6[i]) { - if (priv->srcpad) { - gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING); - gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE); + if (priv->udpsrc_v4[i]) { + if (priv->srcpad) { + /* we set and keep these to playing so that they don't cause NO_PREROLL return + * values. This is only relevant for PLAY pipelines */ + gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING); + gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE); + } + /* add udpsrc */ + gst_bin_add (bin, priv->udpsrc_v4[i]); + + /* and link to the funnel v4 */ + selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); + pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src"); + gst_pad_link (pad, selpad); + gst_object_unref (pad); + gst_object_unref (selpad); } - gst_bin_add (bin, priv->udpsrc_v6[i]); - /* and link to the funnel v6 */ - selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); - pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - } + if (priv->udpsrc_v6[i]) { + if (priv->srcpad) { + gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING); + gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE); + } + gst_bin_add (bin, priv->udpsrc_v6[i]); + + /* and link to the funnel v6 */ + selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); + pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src"); + gst_pad_link (pad, selpad); + gst_object_unref (pad); + gst_object_unref (selpad); + } - if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) { - /* make and add appsrc */ - priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); - priv->appsrc_base_time[i] = -1; - g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL); - gst_bin_add (bin, priv->appsrc[i]); - /* and link to the funnel */ - selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); - pad = gst_element_get_static_pad (priv->appsrc[i], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); + if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) { + /* make and add appsrc */ + priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); + priv->appsrc_base_time[i] = -1; + g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL); + gst_bin_add (bin, priv->appsrc[i]); + /* and link to the funnel */ + selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); + pad = gst_element_get_static_pad (priv->appsrc[i], "src"); + gst_pad_link (pad, selpad); + gst_object_unref (pad); + gst_object_unref (selpad); + } } /* check if we need to set to a special state */ if (state != GST_STATE_NULL) { - if (priv->udpsink[i]) + if (priv->udpsink[i] && (priv->srcpad || i == 1)) gst_element_set_state (priv->udpsink[i], state); - if (priv->appsink[i]) + if (priv->appsink[i] && (priv->srcpad || i == 1)) gst_element_set_state (priv->appsink[i], state); - if (priv->appqueue[i]) + if (priv->appqueue[i] && (priv->srcpad || i == 1)) gst_element_set_state (priv->appqueue[i], state); - if (priv->udpqueue[i]) + if (priv->udpqueue[i] && (priv->srcpad || i == 1)) gst_element_set_state (priv->udpqueue[i], state); - if (priv->tee[i]) + if (priv->tee[i] && (priv->srcpad || i == 1)) gst_element_set_state (priv->tee[i], state); - if (priv->funnel[i]) + if (priv->funnel[i] && (priv->sinkpad || i == 1)) gst_element_set_state (priv->funnel[i], state); - if (priv->appsrc[i]) + if (priv->appsrc[i] && (priv->sinkpad || i == 1)) gst_element_set_state (priv->appsrc[i], state); } } - /* be notified of caps changes */ - priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps", - (GCallback) caps_notify, stream); + if (priv->srcpad) { + /* be notified of caps changes */ + priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps", + (GCallback) caps_notify, stream); + } priv->is_joined = TRUE; g_mutex_unlock (&priv->lock); @@ -2411,15 +2425,16 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, if (priv->srcpad) { gst_pad_unlink (priv->srcpad, priv->send_rtp_sink); + + g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig); + gst_element_release_request_pad (rtpbin, priv->send_rtp_sink); + gst_object_unref (priv->send_rtp_sink); + priv->send_rtp_sink = NULL; } else if (priv->recv_rtp_src) { gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad); gst_object_unref (priv->recv_rtp_src); priv->recv_rtp_src = NULL; } - g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig); - gst_element_release_request_pad (rtpbin, priv->send_rtp_sink); - gst_object_unref (priv->send_rtp_sink); - priv->send_rtp_sink = NULL; for (i = 0; i < 2; i++) { if (priv->udpsink[i]) @@ -2436,7 +2451,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, gst_element_set_state (priv->funnel[i], GST_STATE_NULL); if (priv->appsrc[i]) gst_element_set_state (priv->appsrc[i], GST_STATE_NULL); - if (priv->udpsrc_v4[i]) { + if (priv->udpsrc_v4[i] && (priv->sinkpad || i == 1)) { /* and set udpsrc to NULL now before removing */ gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE); gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL); @@ -2444,7 +2459,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, * pads when they finalize */ gst_bin_remove (bin, priv->udpsrc_v4[i]); } - if (priv->udpsrc_v6[i]) { + if (priv->udpsrc_v6[i] && (priv->sinkpad || i == 1)) { gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE); gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL); gst_bin_remove (bin, priv->udpsrc_v6[i]); @@ -2461,24 +2476,26 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, gst_bin_remove (bin, s->udpsrc[i]); } - if (priv->udpsink[i]) + if (priv->udpsink[i] && (priv->srcpad || i == 1)) gst_bin_remove (bin, priv->udpsink[i]); - if (priv->appsrc[i]) + if (priv->appsrc[i] && (priv->sinkpad || i == 1)) gst_bin_remove (bin, priv->appsrc[i]); - if (priv->appsink[i]) + if (priv->appsink[i] && (priv->srcpad || i == 1)) gst_bin_remove (bin, priv->appsink[i]); - if (priv->appqueue[i]) + if (priv->appqueue[i] && (priv->srcpad || i == 1)) gst_bin_remove (bin, priv->appqueue[i]); - if (priv->udpqueue[i]) + if (priv->udpqueue[i] && (priv->srcpad || i == 1)) gst_bin_remove (bin, priv->udpqueue[i]); - if (priv->tee[i]) + if (priv->tee[i] && (priv->srcpad || i == 1)) gst_bin_remove (bin, priv->tee[i]); - if (priv->funnel[i]) + if (priv->funnel[i] && (priv->sinkpad || i == 1)) gst_bin_remove (bin, priv->funnel[i]); - gst_element_release_request_pad (rtpbin, priv->recv_sink[i]); - gst_object_unref (priv->recv_sink[i]); - priv->recv_sink[i] = NULL; + if (priv->sinkpad || i == 1) { + gst_element_release_request_pad (rtpbin, priv->recv_sink[i]); + gst_object_unref (priv->recv_sink[i]); + priv->recv_sink[i] = NULL; + } priv->udpsrc_v4[i] = NULL; priv->udpsrc_v6[i] = NULL; @@ -2498,8 +2515,10 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, g_list_free (priv->transport_sources); priv->transport_sources = NULL; - gst_object_unref (priv->send_src[0]); - priv->send_src[0] = NULL; + if (priv->srcpad) { + gst_object_unref (priv->send_src[0]); + priv->send_src[0] = NULL; + } gst_element_release_request_pad (rtpbin, priv->send_src[1]); gst_object_unref (priv->send_src[1]); -- 2.7.4