rtsp-stream: create stream pipeline based on transport
authorSrimanta Panda <srimanta@axis.com>
Fri, 4 Dec 2015 07:01:37 +0000 (08:01 +0100)
committerSebastian Dröge <sebastian@centricular.com>
Fri, 4 Dec 2015 12:13:10 +0000 (14:13 +0200)
Based on the protocol, create the rtsp stream pipeline. If only TCP or
only UDP is set as the transport protocol, it will not add the extra tee
or queue element to the pipeline. Both these elements will be added, if
it supports both TCP and UDP protocols. This improves the pipeline
performance when one protocol is present.

https://bugzilla.gnome.org/show_bug.cgi?id=758179

gst/rtsp-server/rtsp-stream.c

index 399f33e..b775689 100644 (file)
@@ -2075,8 +2075,9 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
   gint i;
   guint idx;
   gchar *name;
-  GstPad *pad, *sinkpad, *selpad;
+  GstPad *pad, *sinkpad = NULL, *selpad;
   GstPadLinkReturn ret;
+  gboolean is_tcp = FALSE, is_udp = FALSE;
 
   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
@@ -2093,7 +2094,12 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
 
   GST_INFO ("stream %p joining bin as session %u", stream, idx);
 
-  if (!alloc_ports (stream))
+  is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
+
+  is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
+      (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
+
+  if (is_udp && !alloc_ports (stream))
     goto no_ports;
 
   /* update the dscp qos field in the sinks */
@@ -2180,7 +2186,8 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
      * RTP and RTCP. Sync and preroll are enabled on udpsink so
      * we need to add a queue before appsink and udpsink to make
      * the pipeline not block. For the TCP case, we want to pump
-     * data to the client as fast as possible.
+     * client as fast as possible anyway. This pipeline is used
+     * when both TCP and UDP are present.
      *
      * .--------.      .-----.    .---------.    .---------.
      * | rtpbin |      | tee |    |  queue  |    | udpsink |
@@ -2191,18 +2198,32 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
      *                 |    src->sink      src->sink       |
      *                 '-----'    '---------'    '---------'
      *
-     * When only UDP is allowed, we skip the tee, queue and appsink and link the
-     * udpsink directly to the session.
+     * When only UDP or only TCP is allowed, we skip the tee and queue
+     * and link the udpsink (for UDP) or appsink (for TCP) directly to
+     * the session.
      */
 
     /* Only link the RTP send src if we're going to send RTP, link
      * the RTCP send src always */
     if (priv->srcpad || i == 1) {
-      /* add udpsink */
-      gst_bin_add (bin, priv->udpsink[i]);
-      sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
+      if (is_udp) {
+        /* add udpsink */
+        gst_bin_add (bin, priv->udpsink[i]);
+        sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
+      }
+
+      if (is_tcp) {
+        /* make appsink */
+        priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
+        g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
+        gst_bin_add (bin, priv->appsink[i]);
+        gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
+            &sink_cb, stream, NULL);
+      }
+
+      if (is_udp && is_tcp) {
+        g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
 
-      if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
         /* make tee for RTP/RTCP */
         priv->tee[i] = gst_element_factory_make ("tee", NULL);
         gst_bin_add (bin, priv->tee[i]);
@@ -2228,38 +2249,43 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
         gst_pad_link (queuepad, sinkpad);
         gst_object_unref (queuepad);
+        gst_object_unref (sinkpad);
 
-        /* make queue */
+        /* make appqueue */
         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
         g_object_set (priv->appqueue[i], "max-size-buffers",
             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
             NULL);
         gst_bin_add (bin, priv->appqueue[i]);
-        /* and link to tee */
+        /* and link tee to appqueue */
         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
         gst_pad_link (teepad, pad);
         gst_object_unref (pad);
         gst_object_unref (teepad);
 
-        /* make appsink */
-        priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
-        g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
-        g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
-        gst_bin_add (bin, priv->appsink[i]);
-        gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
-            &sink_cb, stream, NULL);
-        /* and link to queue */
+        /* and link appqueue to appsink */
         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
         gst_pad_link (queuepad, pad);
         gst_object_unref (pad);
         gst_object_unref (queuepad);
+      } else if (is_tcp) {
+        /* only appsink needed, link it to the session */
+        pad = gst_element_get_static_pad (priv->appsink[i], "sink");
+        gst_pad_link (priv->send_src[i], pad);
+        gst_object_unref (pad);
+
+        /* when its only TCP, we need to set sync and preroll to FALSE
+         * for the sink to avoid deadlock. And this is only needed for
+         * sink used for RTCP data, not the RTP data. */
+        if (i == 1)
+          g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
       } else {
         /* else only udpsink needed, link it to the session */
         gst_pad_link (priv->send_src[i], sinkpad);
+        gst_object_unref (sinkpad);
       }
-      gst_object_unref (sinkpad);
     }
 
     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
@@ -2319,7 +2345,7 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
         gst_object_unref (selpad);
       }
 
-      if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
+      if (is_tcp) {
         /* make and add appsrc */
         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
         priv->appsrc_base_time[i] = -1;
@@ -2403,6 +2429,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
   GstRTSPStreamPrivate *priv;
   gint i;
   GList *l;
+  gboolean is_tcp, is_udp;
 
   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
@@ -2436,6 +2463,12 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
     priv->recv_rtp_src = NULL;
   }
 
+  is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
+
+  is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
+      (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
+
+
   for (i = 0; i < 2; i++) {
     if (priv->udpsink[i])
       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
@@ -2476,17 +2509,17 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
       gst_bin_remove (bin, s->udpsrc[i]);
     }
 
-    if (priv->udpsink[i] && (priv->srcpad || i == 1))
+    if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->udpsink[i]);
     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
       gst_bin_remove (bin, priv->appsrc[i]);
-    if (priv->appsink[i] && (priv->srcpad || i == 1))
+    if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->appsink[i]);
-    if (priv->appqueue[i] && (priv->srcpad || i == 1))
+    if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->appqueue[i]);
-    if (priv->udpqueue[i] && (priv->srcpad || i == 1))
+    if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->udpqueue[i]);
-    if (priv->tee[i] && (priv->srcpad || i == 1))
+    if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
       gst_bin_remove (bin, priv->tee[i]);
     if (priv->funnel[i] && (priv->sinkpad || i == 1))
       gst_bin_remove (bin, priv->funnel[i]);
@@ -3409,7 +3442,14 @@ gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
   priv = stream->priv;
 
   g_mutex_lock (&priv->lock);
-  if ((sink = priv->udpsink[0]))
+  /* depending on the transport type, it should query corresponding sink */
+  if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
+      (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
+    sink = priv->udpsink[0];
+  else
+    sink = priv->appsink[0];
+
+  if (sink)
     gst_object_ref (sink);
   g_mutex_unlock (&priv->lock);
 
@@ -3444,7 +3484,14 @@ gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
   priv = stream->priv;
 
   g_mutex_lock (&priv->lock);
-  if ((sink = priv->udpsink[0]))
+  /* depending on the transport type, it should query corresponding sink */
+  if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
+      (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
+    sink = priv->udpsink[0];
+  else
+    sink = priv->appsink[0];
+
+  if (sink)
     gst_object_ref (sink);
   g_mutex_unlock (&priv->lock);