rtsp-stream: Always create multicast UDP elements if the protocol flag is set
authorSebastian Dröge <sebastian@centricular.com>
Mon, 5 Sep 2016 15:04:50 +0000 (18:04 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Mon, 5 Sep 2016 15:09:22 +0000 (18:09 +0300)
Adding them later will cause deadlocks due to
1) pre-rolling and staying in PAUSED with the unicast/TCP sinks
2) adding the multicast sink
3) waiting for it to get data to preroll again

3) never happens because the queues after the tee are full.

gst/rtsp-server/rtsp-stream.c

index 1d5f58e9471a272fed93eedb5d94f41a8fa8706e..5773fa60c91ebe0d8a5c105a37b8f4a85d44c10b 100644 (file)
@@ -124,8 +124,6 @@ struct _GstRTSPStreamPrivate
   GstRTSPAddressPool *pool;
 
   /* unicast server addr/port */
-  GstRTSPRange server_port_v4;
-  GstRTSPRange server_port_v6;
   GstRTSPAddress *server_addr_v4;
   GstRTSPAddress *server_addr_v6;
 
@@ -915,21 +913,9 @@ gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
   return result;
 }
 
-/**
- * gst_rtsp_stream_get_multicast_address:
- * @stream: a #GstRTSPStream
- * @family: the #GSocketFamily
- *
- * Get the multicast address of @stream for @family. The original
- * #GstRTSPAddress is cached and copy is returned, so freeing the return value
- * won't release the address from the pool.
- *
- * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
- * or %NULL when no address could be allocated. gst_rtsp_address_free()
- * after usage.
- */
-GstRTSPAddress *
-gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
+
+static GstRTSPAddress *
+gst_rtsp_stream_get_multicast_address_locked (GstRTSPStream * stream,
     GSocketFamily family)
 {
   GstRTSPStreamPrivate *priv;
@@ -937,8 +923,6 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
   GstRTSPAddress **addrp;
   GstRTSPAddressFlags flags;
 
-  g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
-
   priv = stream->priv;
 
   if (family == G_SOCKET_FAMILY_IPV6) {
@@ -949,7 +933,6 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
     addrp = &priv->mcast_addr_v4;
   }
 
-  g_mutex_lock (&priv->lock);
   if (*addrp == NULL) {
     if (priv->pool == NULL)
       goto no_pool;
@@ -967,7 +950,6 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
   }
   result = gst_rtsp_address_copy (*addrp);
-  g_mutex_unlock (&priv->lock);
 
   return result;
 
@@ -975,17 +957,43 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
 no_pool:
   {
     GST_ERROR_OBJECT (stream, "no address pool specified");
-    g_mutex_unlock (&priv->lock);
     return NULL;
   }
 no_address:
   {
     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
-    g_mutex_unlock (&priv->lock);
     return NULL;
   }
 }
 
+/**
+ * gst_rtsp_stream_get_multicast_address:
+ * @stream: a #GstRTSPStream
+ * @family: the #GSocketFamily
+ *
+ * Get the multicast address of @stream for @family. The original
+ * #GstRTSPAddress is cached and copy is returned, so freeing the return value
+ * won't release the address from the pool.
+ *
+ * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
+ * or %NULL when no address could be allocated. gst_rtsp_address_free()
+ * after usage.
+ */
+GstRTSPAddress *
+gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
+    GSocketFamily family)
+{
+  GstRTSPAddress *result;
+
+  g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
+
+  g_mutex_lock (&stream->priv->lock);
+  result = gst_rtsp_stream_get_multicast_address_locked (stream, family);
+  g_mutex_unlock (&stream->priv->lock);
+
+  return result;
+}
+
 /**
  * gst_rtsp_stream_reserve_address:
  * @stream: a #GstRTSPStream
@@ -1202,7 +1210,7 @@ error:
 static gboolean
 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
     GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
-    GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out)
+    GstRTSPAddress ** server_addr_out, gboolean multicast)
 {
   GstRTSPStreamPrivate *priv = stream->priv;
   GSocket *rtp_socket = NULL;
@@ -1248,13 +1256,21 @@ again:
     g_socket_set_multicast_loopback (rtp_socket, FALSE);
   }
 
-  if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
+  if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
     GstRTSPAddressFlags flags;
 
     if (addr)
       rejected_addresses = g_list_prepend (rejected_addresses, addr);
 
-    flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
+    if (!pool)
+      goto no_ports;
+
+    flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
+    if (multicast)
+      flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
+    else
+      flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
+
     if (family == G_SOCKET_FAMILY_IPV6)
       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
     else
@@ -1316,21 +1332,20 @@ again:
   }
   g_object_unref (rtcp_sockaddr);
 
-  if (addr == NULL)
-    addr_str = g_inet_address_to_string (inetaddr);
-  else
-    addr_str = addr->address;
+  if (!addr) {
+    addr = g_slice_new0 (GstRTSPAddress);
+    addr->address = g_inet_address_to_string (inetaddr);
+    addr->port = tmp_rtp;
+    addr->n_ports = 2;
+  }
+
+  addr_str = addr->address;
   g_clear_object (&inetaddr);
 
   if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
-    if (addr == NULL)
-      g_free (addr_str);
     goto no_udp_protocol;
   }
 
-  if (addr == NULL)
-    g_free (addr_str);
-
   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
 
@@ -1338,18 +1353,26 @@ again:
   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
     goto port_error;
 
-  server_port_out->min = rtpport;
-  server_port_out->max = rtcpport;
-
   /* This function is called twice (for v4 and v6) but we create only one pair
    * of udpsinks. */
   if (!udpsink_out[0]
       && !create_and_configure_udpsinks (stream, udpsink_out))
     goto no_udp_protocol;
 
+  if (multicast) {
+    g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface",
+        priv->multicast_iface, NULL);
+    g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface",
+        priv->multicast_iface, NULL);
+
+    g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL);
+    g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL);
+  }
+
   set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
 
   *server_addr_out = addr;
+
   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
 
   g_object_unref (rtp_socket);
@@ -1464,15 +1487,21 @@ alloc_ports (GstRTSPStream * stream)
   GstRTSPStreamPrivate *priv = stream->priv;
   gboolean ret = TRUE;
 
-  if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
-      (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
+  if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) {
     ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
-        priv->udpsrc_v4, priv->udpsink,
-        &priv->server_port_v4, &priv->server_addr_v4);
+        priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE);
 
     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
-        priv->udpsrc_v6, priv->udpsink,
-        &priv->server_port_v6, &priv->server_addr_v6);
+        priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE);
+  }
+
+  /* FIXME: Maybe actually consider the return values? */
+  if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
+    ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
+        priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE);
+
+    ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
+        priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE);
   }
 
   return ret;
@@ -1499,11 +1528,17 @@ gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
 
   g_mutex_lock (&priv->lock);
   if (family == G_SOCKET_FAMILY_IPV4) {
-    if (server_port)
-      *server_port = priv->server_port_v4;
+    if (server_port) {
+      server_port->min = priv->server_addr_v4->port;
+      server_port->max =
+          priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
+    }
   } else {
-    if (server_port)
-      *server_port = priv->server_port_v6;
+    if (server_port) {
+      server_port->min = priv->server_addr_v6->port;
+      server_port->max =
+          priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
+    }
   }
   g_mutex_unlock (&priv->lock);
 }
@@ -2357,7 +2392,7 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
 {
   GstRTSPStreamPrivate *priv;
   GstPad *pad;
-  gboolean is_tcp = FALSE, is_udp = FALSE;
+  gboolean is_tcp, is_udp;
   gint i;
 
   priv = stream->priv;
@@ -2414,7 +2449,12 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
       gst_pad_link (priv->send_src[i], pad);
       gst_object_unref (pad);
 
-      plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
+      if (priv->udpsink[i])
+        plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
+
+      if (priv->mcast_udpsink[i])
+        plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i],
+            &priv->mcast_udpqueue[i]);
 
       if (is_tcp) {
         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
@@ -2437,12 +2477,16 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
     if (state != GST_STATE_NULL) {
       if (priv->udpsink[i])
         gst_element_set_state (priv->udpsink[i], state);
+      if (priv->mcast_udpsink[i])
+        gst_element_set_state (priv->mcast_udpsink[i], state);
       if (priv->appsink[i])
         gst_element_set_state (priv->appsink[i], state);
       if (priv->appqueue[i])
         gst_element_set_state (priv->appqueue[i], state);
       if (priv->udpqueue[i])
         gst_element_set_state (priv->udpqueue[i], state);
+      if (priv->mcast_udpqueue[i])
+        gst_element_set_state (priv->mcast_udpqueue[i], state);
       if (priv->tee[i])
         gst_element_set_state (priv->tee[i], state);
     }
@@ -2525,6 +2569,12 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
     if (priv->udpsrc_v6[i])
       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
 
+    if (priv->mcast_udpsrc_v4[i])
+      plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
+
+    if (priv->mcast_udpsrc_v6[i])
+      plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
+
     if (is_tcp) {
       /* make and add appsrc */
       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
@@ -2542,20 +2592,13 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
 }
 
 static gboolean
-create_mcast_part_for_transport (GstRTSPStream * stream,
+check_mcast_part_for_transport (GstRTSPStream * stream,
     const GstRTSPTransport * tr)
 {
   GstRTSPStreamPrivate *priv = stream->priv;
   GInetAddress *inetaddr;
   GSocketFamily family;
   GstRTSPAddress *mcast_addr;
-  GstElement **mcast_udpsrc;
-  GSocket *rtp_socket = NULL;
-  GSocket *rtcp_socket = NULL;
-  GSocketAddress *rtp_sockaddr = NULL;
-  GSocketAddress *rtcp_sockaddr = NULL;
-  GError *error = NULL;
-  const gchar *multicast_iface = priv->multicast_iface;
 
   /* Check if it's a ipv4 or ipv6 transport */
   inetaddr = g_inet_address_new_from_string (tr->destination);
@@ -2565,10 +2608,8 @@ create_mcast_part_for_transport (GstRTSPStream * stream,
   /* Select fields corresponding to the family */
   if (family == G_SOCKET_FAMILY_IPV4) {
     mcast_addr = priv->mcast_addr_v4;
-    mcast_udpsrc = priv->mcast_udpsrc_v4;
   } else {
     mcast_addr = priv->mcast_addr_v6;
-    mcast_udpsrc = priv->mcast_udpsrc_v6;
   }
 
   /* We support only one mcast group per family, make sure this transport
@@ -2582,95 +2623,6 @@ create_mcast_part_for_transport (GstRTSPStream * stream,
       tr->ttl != mcast_addr->ttl)
     goto wrong_addr;
 
-  if (mcast_udpsrc[0]) {
-    /* We already created elements for this family. Since we support only one
-     * mcast group per family, there is nothing more to do here. */
-    g_assert (mcast_udpsrc[1]);
-    g_assert (priv->mcast_udpqueue[0]);
-    g_assert (priv->mcast_udpqueue[1]);
-    g_assert (priv->mcast_udpsink[0]);
-    g_assert (priv->mcast_udpsink[1]);
-    return TRUE;
-  }
-
-  g_assert (!mcast_udpsrc[1]);
-
-  /* Create RTP/RTCP sockets and bind them on ANY with mcast ports */
-  rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
-      G_SOCKET_PROTOCOL_UDP, &error);
-  if (!rtp_socket)
-    goto socket_error;
-  g_socket_set_multicast_loopback (rtp_socket, FALSE);
-
-  rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
-      G_SOCKET_PROTOCOL_UDP, &error);
-  if (!rtcp_socket)
-    goto socket_error;
-  g_socket_set_multicast_loopback (rtcp_socket, FALSE);
-
-  inetaddr = g_inet_address_new_any (family);
-  rtp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port);
-  rtcp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port + 1);
-  g_object_unref (inetaddr);
-
-  if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, &error))
-    goto socket_error;
-
-  if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, &error))
-    goto socket_error;
-
-  g_object_unref (rtp_sockaddr);
-  g_object_unref (rtcp_sockaddr);
-
-  /* Add receiver part */
-  create_and_configure_udpsources (mcast_udpsrc, rtp_socket, rtcp_socket);
-  if (priv->sinkpad) {
-    plug_src (stream, priv->joined_bin, mcast_udpsrc[0], priv->funnel[0]);
-    gst_element_sync_state_with_parent (mcast_udpsrc[0]);
-  }
-  plug_src (stream, priv->joined_bin, mcast_udpsrc[1], priv->funnel[1]);
-  gst_element_sync_state_with_parent (mcast_udpsrc[1]);
-
-  /* Add sender part, could already have been created for the other family. */
-  if (!priv->mcast_udpsink[0]) {
-    g_assert (!priv->mcast_udpsink[1]);
-    g_assert (!priv->mcast_udpqueue[0]);
-    g_assert (!priv->mcast_udpqueue[1]);
-
-    create_and_configure_udpsinks (stream, priv->mcast_udpsink);
-
-    g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "multicast-iface",
-        multicast_iface, NULL);
-    g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "multicast-iface",
-        multicast_iface, NULL);
-
-    g_signal_emit_by_name (priv->mcast_udpsink[0], "add", mcast_addr->address,
-        mcast_addr->port, NULL);
-    g_signal_emit_by_name (priv->mcast_udpsink[1], "add", mcast_addr->address,
-        mcast_addr->port + 1, NULL);
-
-    set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
-        family);
-
-    if (priv->srcpad) {
-      plug_sink (priv->joined_bin, priv->tee[0], priv->mcast_udpsink[0],
-          &priv->mcast_udpqueue[0]);
-      gst_element_sync_state_with_parent (priv->mcast_udpsink[0]);
-      gst_element_sync_state_with_parent (priv->mcast_udpqueue[0]);
-    }
-    plug_sink (priv->joined_bin, priv->tee[1], priv->mcast_udpsink[1],
-        &priv->mcast_udpqueue[1]);
-    gst_element_sync_state_with_parent (priv->mcast_udpsink[1]);
-    gst_element_sync_state_with_parent (priv->mcast_udpqueue[1]);
-  } else {
-    g_assert (priv->mcast_udpsink[1]);
-    g_assert (priv->mcast_udpqueue[0]);
-    g_assert (priv->mcast_udpqueue[1]);
-
-    set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
-        family);
-  }
-
   return TRUE;
 
 no_addr:
@@ -2685,17 +2637,6 @@ wrong_addr:
         "the reserved address");
     return FALSE;
   }
-socket_error:
-  {
-    GST_ERROR_OBJECT (stream, "Error creating and binding mcast socket: %s",
-        error->message);
-    g_clear_object (&rtp_socket);
-    g_clear_object (&rtcp_socket);
-    g_clear_object (&rtp_sockaddr);
-    g_clear_object (&rtcp_sockaddr);
-    g_clear_error (&error);
-    return FALSE;
-  }
 }
 
 /**
@@ -2962,6 +2903,19 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
   if (priv->srtpdec)
     gst_object_unref (priv->srtpdec);
 
+  if (priv->mcast_addr_v4)
+    gst_rtsp_address_free (priv->mcast_addr_v4);
+  priv->mcast_addr_v4 = NULL;
+  if (priv->mcast_addr_v6)
+    gst_rtsp_address_free (priv->mcast_addr_v6);
+  priv->mcast_addr_v6 = NULL;
+  if (priv->server_addr_v4)
+    gst_rtsp_address_free (priv->server_addr_v4);
+  priv->server_addr_v4 = NULL;
+  if (priv->server_addr_v6)
+    gst_rtsp_address_free (priv->server_addr_v6);
+  priv->server_addr_v6 = NULL;
+
   g_clear_object (&priv->joined_bin);
   g_mutex_unlock (&priv->lock);
 
@@ -3321,13 +3275,11 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
     {
       if (add) {
-        if (!create_mcast_part_for_transport (stream, tr))
+        if (!check_mcast_part_for_transport (stream, tr))
           goto mcast_error;
         priv->transports = g_list_prepend (priv->transports, trans);
       } else {
         priv->transports = g_list_remove (priv->transports, trans);
-        /* FIXME: Check if there are remaining mcast transports, and destroy
-         * mcast part if its now unused */
       }
       break;
     }