From: Xavier Claessens Date: Thu, 28 Jul 2016 19:33:05 +0000 (-0400) Subject: stream: revert back to create udpsrc/udpsink on DESCRIBE for unicast X-Git-Tag: 1.19.3~495^2~490 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=8495c47a9d1fff390aaac1f3bb5c22f760786644;p=platform%2Fupstream%2Fgstreamer.git stream: revert back to create udpsrc/udpsink on DESCRIBE for unicast This is basically reverting changes introduced in commit f62a9a7, because it was introducing various regressions: - It introduces a leak of udpsrc elements that got wrongly fixed by adding an hash table in commit cba045e. We should have at most 4 udpsrc for unicast: ipv4/ipv6, rtp/rtcp. They can be reused for all unicast clients. - If a mcast client connects, it creates a new socket in SETUP to try to respect the destination/port given by the client in the transport, and overrides the socket already set on the udpsink element. That means that if we already had a client connected, the source address on the udp packets it receives suddenly changes. - If a 2nd mcast client connects, the destination/port in its transport is ignored but its transport wasn't updated. What this patch does: - Revert back to create udpsrc/udpsink for unicast clients on DESCRIBE. - Always have a tee+queue when udp is enabled. This could be optimized again in a later patch, but is more complicated. If no unicast clients connects then those elements are useless, this could be also optimized in a later patch. - When mcast transport is added, it creates a new set of udpsrc/udpsink, seperated from those for unicast clients. Since we already support only one mcast address, we also create only one set of elements. https://bugzilla.gnome.org/show_bug.cgi?id=766612 --- diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index 7af7701..956a9eb 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -1421,27 +1421,26 @@ default_configure_client_transport (GstRTSPClient * client, /* we have a valid transport now, set the destination of the client. */ if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { gboolean use_client_settings; - GSocketFamily family; use_client_settings = gst_rtsp_auth_check (GST_RTSP_AUTH_CHECK_TRANSPORT_CLIENT_SETTINGS); if (ct->destination && use_client_settings) { - family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4; + GstRTSPAddress *addr; - if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct, TRUE)) - goto no_udp_protocol; + addr = gst_rtsp_stream_reserve_address (ctx->stream, ct->destination, + ct->port.min, ct->port.max - ct->port.min + 1, ct->ttl); + if (addr == NULL) + goto no_address; + + gst_rtsp_address_free (addr); } else { GstRTSPAddress *addr; + GSocketFamily family; family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4; - if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct, - FALSE)) - goto no_udp_protocol; - - gst_rtsp_stream_get_server_port (ctx->stream, &ct->port, family); addr = gst_rtsp_stream_get_multicast_address (ctx->stream, family); if (addr == NULL) goto no_address; @@ -1494,12 +1493,6 @@ default_configure_client_transport (GstRTSPClient * client, gst_rtsp_session_media_alloc_channels (ctx->sessmedia, &ct->interleaved); } - } else if (ct->lower_transport & GST_RTSP_LOWER_TRANS_UDP) { - GSocketFamily family; - family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4; - if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct, - FALSE)) - goto no_udp_protocol; } } return TRUE; @@ -1510,11 +1503,6 @@ no_address: GST_ERROR_OBJECT (client, "failed to acquire address for stream"); return FALSE; } -no_udp_protocol: - { - GST_ERROR_OBJECT (client, "failed to allocate udp ports"); - return FALSE; - } } static GstRTSPTransport * diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 0088e21..e1fbc39 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -94,21 +94,18 @@ struct _GstRTSPStreamPrivate GstElement *srtpdec; GHashTable *keys; - /* sinks used for sending and receiving RTP and RTCP over ipv4, they share - * sockets */ + /* for UDP unicast */ GstElement *udpsrc_v4[2]; - /* UDP sources for UDP multicast transports */ - GstElement *udpsrc_mcast_v4[2]; - - /* sinks used for sending and receiving RTP and RTCP over ipv6, they share - * sockets */ GstElement *udpsrc_v6[2]; - /* UDP sources for UDP multicast transports */ - GstElement *udpsrc_mcast_v6[2]; - GstElement *udpqueue[2]; GstElement *udpsink[2]; + /* for UDP multicast */ + GstElement *mcast_udpsrc_v4[2]; + GstElement *mcast_udpsrc_v6[2]; + GstElement *mcast_udpqueue[2]; + GstElement *mcast_udpsink[2]; + /* for TCP transport */ GstElement *appsrc[2]; GstClockTime appsrc_base_time[2]; @@ -123,22 +120,18 @@ struct _GstRTSPStreamPrivate guint rtx_pt; GstClockTime rtx_time; - /* server ports for sending/receiving over ipv4 */ - GstRTSPRange server_port_v4; - GstRTSPAddress *server_addr_v4; - gboolean have_ipv4; + /* pool used to manage unicast and multicast addresses */ + GstRTSPAddressPool *pool; - /* server ports for sending/receiving over ipv6 */ + /* unicast server addr/port */ + GstRTSPRange server_port_v4; GstRTSPRange server_port_v6; + GstRTSPAddress *server_addr_v4; GstRTSPAddress *server_addr_v6; - gboolean have_ipv6; /* multicast addresses */ - GstRTSPAddressPool *pool; GstRTSPAddress *mcast_addr_v4; GstRTSPAddress *mcast_addr_v6; - gboolean have_ipv4_mcast; - gboolean have_ipv6_mcast; gchar *multicast_iface; @@ -155,7 +148,6 @@ struct _GstRTSPStreamPrivate guint tr_cache_cookie_rtp; guint tr_cache_cookie_rtcp; - gint dscp_qos; /* stream blocking */ @@ -595,22 +587,18 @@ gst_rtsp_stream_get_mtu (GstRTSPStream * stream) /* Update the dscp qos property on the udp sinks */ static void -update_dscp_qos (GstRTSPStream * stream) +update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2]) { GstRTSPStreamPrivate *priv; - g_return_if_fail (GST_IS_RTSP_STREAM (stream)); - priv = stream->priv; - if (priv->udpsink[0]) { - g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos, - NULL); + if (udpsink[0]) { + g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL); } - if (priv->udpsink[1]) { - g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos, - NULL); + if (udpsink[1]) { + g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL); } } @@ -639,7 +627,7 @@ gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos) priv->dscp_qos = dscp_qos; - update_dscp_qos (stream); + update_dscp_qos (stream, priv->udpsink); } /** @@ -971,6 +959,12 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2); if (*addrp == NULL) goto no_address; + + /* FIXME: Also reserve the same port with unicast ANY address, since that's + * where we are going to bind our socket. Probably loop until we find a port + * available in both mcast and unicast pools. Maybe GstRTSPAddressPool + * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and + * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */ } result = gst_rtsp_address_copy (*addrp); g_mutex_unlock (&priv->lock); @@ -1050,6 +1044,9 @@ gst_rtsp_stream_reserve_address (GstRTSPStream * stream, port, n_ports, ttl, addrp); if (res != GST_RTSP_ADDRESS_POOL_OK) goto no_address; + + /* FIXME: Also reserve the same port with unicast ANY address, since that's + * where we are going to bind our socket. */ } else { if (strcmp ((*addrp)->address, address) || (*addrp)->port != port || (*addrp)->n_ports != n_ports || @@ -1086,10 +1083,9 @@ different_address: /* must be called with lock */ static void -set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket, +set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family) { - GstRTSPStreamPrivate *priv = stream->priv; const gchar *multisink_socket; if (family == G_SOCKET_FAMILY_IPV6) @@ -1097,36 +1093,21 @@ set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket, else multisink_socket = "socket"; - g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket, - NULL); - g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket, - NULL); + g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL); + g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL); } -/* must be called with lock */ static gboolean -create_and_configure_udpsinks (GstRTSPStream * stream) +create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2], + const gchar * multicast_iface) { GstRTSPStreamPrivate *priv = stream->priv; GstElement *udpsink0, *udpsink1; - udpsink0 = NULL; - udpsink1 = NULL; - - if (priv->udpsink[0]) - udpsink0 = priv->udpsink[0]; - else - udpsink0 = gst_element_factory_make ("multiudpsink", NULL); - - if (!udpsink0) - goto no_udp_protocol; - - if (priv->udpsink[1]) - udpsink1 = priv->udpsink[1]; - else - udpsink1 = gst_element_factory_make ("multiudpsink", NULL); + udpsink0 = gst_element_factory_make ("multiudpsink", NULL); + udpsink1 = gst_element_factory_make ("multiudpsink", NULL); - if (!udpsink1) + if (!udpsink0 || !udpsink1) goto no_udp_protocol; /* configure sinks */ @@ -1147,17 +1128,23 @@ create_and_configure_udpsinks (GstRTSPStream * stream) g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL); g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL); + /* join multicast group when adding clients, so we'll start receiving from it. + * We cannot rely on the udpsrc to join the group since its socket is always a + * local unicast one. */ + g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL); + g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL); + + g_object_set (G_OBJECT (udpsink0), "multicast-iface", multicast_iface, NULL); + g_object_set (G_OBJECT (udpsink1), "multicast-iface", multicast_iface, NULL); g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL); g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL); - /* update the dscp qos field in the sinks */ - update_dscp_qos (stream); + udpsink[0] = udpsink0; + udpsink[1] = udpsink1; - priv->udpsink[0] = udpsink0; - priv->udpsink[1] = udpsink1; + /* update the dscp qos field in the sinks */ + update_dscp_qos (stream, udpsink); return TRUE; @@ -1169,54 +1156,9 @@ no_udp_protocol: } /* must be called with lock */ -static void -play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2], - GSocketFamily family) -{ - GstRTSPStreamPrivate *priv; - GstPad *pad, *selpad; - guint i; - - priv = stream->priv; - - for (i = 0; i < 2; i++) { - if (!priv->sinkpad && i == 0) { - /* Only connect recv RTP sink if we expect to receive RTP. Connect recv - * RTCP sink always */ - continue; - } - - if (priv->srcpad) { - /* we set and keep these to playing so that they don't cause NO_PREROLL return - * values. This is only relevant for PLAY pipelines */ - gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING); - gst_element_set_locked_state (udpsrc_out[i], TRUE); - } - - /* add udpsrc */ - gst_bin_add (priv->joined_bin, udpsrc_out[i]); - - /* and link to the funnel */ - selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); - pad = gst_element_get_static_pad (udpsrc_out[i], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - - /* otherwise sync state with parent in case it's running already - * at this point */ - if (!priv->srcpad) { - gst_element_sync_state_with_parent (udpsrc_out[i]); - } - } -} - -/* must be called with lock */ static gboolean -create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2], - GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family, - const gchar * address, gint rtpport, gint rtcpport, - const gchar * multicast_iface, GstRTSPLowerTrans transport) +create_and_configure_udpsources (GstElement * udpsrc_out[2], + GSocket * rtp_socket, GSocket * rtcp_socket) { GstStateChangeReturn ret; @@ -1226,22 +1168,17 @@ create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2], if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL) goto error; - if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { - g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL); - g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL); - g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface, - NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface, - NULL); - g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL); - } - g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL); g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL); + /* The udpsrc cannot do the join because its socket is always a local unicast + * one. The udpsink sharing the same socket will do it for us. */ + g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL); + g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL); + + g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL); + g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL); + ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY); if (ret == GST_STATE_CHANGE_FAILURE) goto error; @@ -1268,9 +1205,8 @@ error: static gboolean alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, - GstElement * udpsrc_out[2], GstRTSPRange * server_port_out, - GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out, - gboolean use_client_settings) + GstElement * udpsrc_out[2], GstElement * udpsink_out[2], + GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out) { GstRTSPStreamPrivate *priv = stream->priv; GSocket *rtp_socket = NULL; @@ -1285,12 +1221,15 @@ alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, GSocketAddress *rtp_sockaddr = NULL; GSocketAddress *rtcp_sockaddr = NULL; GstRTSPAddressPool *pool; - GstRTSPLowerTrans transport; - const gchar *multicast_iface = priv->multicast_iface; + + g_assert (!udpsrc_out[0]); + g_assert (!udpsrc_out[1]); + g_assert ((!udpsink_out[0] && !udpsink_out[1]) || + (udpsink_out[0] && udpsink_out[1])); + g_assert (*server_addr_out == NULL); pool = priv->pool; count = 0; - transport = ct->lower_transport; /* Start with random port */ tmp_rtp = 0; @@ -1301,9 +1240,6 @@ alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, goto no_udp_protocol; g_socket_set_multicast_loopback (rtcp_socket, FALSE); - if (*server_addr_out) - gst_rtsp_address_free (*server_addr_out); - /* try to allocate 2 UDP ports, the RTP port should be an even * number and the RTCP port should be the next (uneven) port */ again: @@ -1316,30 +1252,19 @@ again: g_socket_set_multicast_loopback (rtp_socket, FALSE); } - if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP && - gst_rtsp_address_pool_has_unicast_addresses (pool)) - || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) { - GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT; - - if (transport == GST_RTSP_LOWER_TRANS_UDP) - flags |= GST_RTSP_ADDRESS_FLAG_UNICAST; - else - flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST; + if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) { + GstRTSPAddressFlags flags; if (addr) rejected_addresses = g_list_prepend (rejected_addresses, addr); + flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST; if (family == G_SOCKET_FAMILY_IPV6) flags |= GST_RTSP_ADDRESS_FLAG_IPV6; else flags |= GST_RTSP_ADDRESS_FLAG_IPV4; - if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST - && use_client_settings) - gst_rtsp_address_pool_reserve_address (pool, ct->destination, - ct->port.min, 2, ct->ttl, &addr); - else - addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2); + addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2); if (addr == NULL) goto no_ports; @@ -1348,14 +1273,6 @@ again: g_clear_object (&inetaddr); inetaddr = g_inet_address_new_from_string (addr->address); - - /* If we're supposed to bind to a multicast address, instead bind - * to ANY and let udpsrc later join the relevant multicast group - */ - if (g_inet_address_get_is_multicast (inetaddr)) { - g_object_unref (inetaddr); - inetaddr = g_inet_address_new_any (family); - } } else { if (tmp_rtp != 0) { tmp_rtp += 2; @@ -1409,9 +1326,7 @@ again: addr_str = addr->address; g_clear_object (&inetaddr); - if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket, - rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface, - transport)) { + if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) { if (addr == NULL) g_free (addr_str); goto no_udp_protocol; @@ -1420,8 +1335,6 @@ again: if (addr == NULL) g_free (addr_str); - play_udpsources_one_family (stream, udpsrc_out, family); - g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL); g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL); @@ -1429,12 +1342,17 @@ again: if (rtpport != tmp_rtp || rtcpport != tmp_rtcp) goto port_error; - /* set RTP and RTCP sockets */ - set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family); - server_port_out->min = rtpport; server_port_out->max = rtcpport; + /* This function is called twice (for v4 and v6) but we create only one pair + * of udpsinks. */ + if (!udpsink_out[0] + && !create_and_configure_udpsinks (stream, udpsink_out, NULL)) + goto no_udp_protocol; + + set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family); + *server_addr_out = addr; g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); @@ -1485,60 +1403,14 @@ cleanup: * Allocates RTP and RTCP ports. * * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated. + * Deprecated: This function shouldn't have been made public */ gboolean gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings) { - GstRTSPStreamPrivate *priv; - gboolean result = FALSE; - GstRTSPLowerTrans transport = ct->lower_transport; - - g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); - priv = stream->priv; - g_return_val_if_fail (priv->joined_bin != NULL, FALSE); - - g_mutex_lock (&priv->lock); - - if (family == G_SOCKET_FAMILY_IPV4) { - if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { - if (priv->have_ipv4_mcast) - goto done; - priv->have_ipv4_mcast = - alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, - priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, - &priv->mcast_addr_v4, use_client_settings); - } else { - priv->have_ipv4 = - alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, - &priv->server_port_v4, ct, &priv->server_addr_v4, - use_client_settings); - } - } else { - if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { - if (priv->have_ipv6_mcast) - goto done; - priv->have_ipv6_mcast = - alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, - priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, - &priv->mcast_addr_v6, use_client_settings); - } else { - if (priv->have_ipv6) - goto done; - priv->have_ipv6 = - alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, - &priv->server_port_v6, ct, &priv->server_addr_v6, - use_client_settings); - } - } - -done: - result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 || - priv->have_ipv6_mcast; - - g_mutex_unlock (&priv->lock); - - return result; + g_warn_if_reached (); + return FALSE; } /** @@ -1589,6 +1461,27 @@ gst_rtsp_stream_is_client_side (GstRTSPStream * stream) return ret; } +/* must be called with lock */ +static gboolean +alloc_ports (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + gboolean ret = TRUE; + + if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || + (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) { + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + priv->udpsrc_v4, priv->udpsink, + &priv->server_port_v4, &priv->server_addr_v4); + + ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + priv->udpsrc_v6, priv->udpsink, + &priv->server_port_v6, &priv->server_addr_v6); + } + + return ret; +} + /** * gst_rtsp_stream_get_server_port: * @stream: a #GstRTSPStream @@ -2463,7 +2356,7 @@ plug_sink (GstBin * bin, GstElement * tee, GstElement * sink, } /* must be called with lock */ -static gboolean +static void create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) { GstRTSPStreamPrivate *priv; @@ -2477,9 +2370,6 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)); - if (is_udp && !create_and_configure_udpsinks (stream)) - goto no_udp_protocol; - for (i = 0; i < 2; i++) { /* For the sender we create this bit of pipeline for both * RTP and RTCP. Sync and preroll are enabled on udpsink so @@ -2515,9 +2405,10 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) &sink_cb, stream, NULL); } - if (is_udp && is_tcp) { - g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); - + /* If we have udp always use a tee because we could have mcast clients + * requesting different ports, in which case we'll have to plug more + * udpsinks. */ + if (is_udp) { /* make tee for RTP/RTCP */ priv->tee[i] = gst_element_factory_make ("tee", NULL); gst_bin_add (bin, priv->tee[i]); @@ -2528,7 +2419,11 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) gst_object_unref (pad); plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]); - plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]); + + if (is_tcp) { + g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); + plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]); + } } else if (is_tcp) { /* only appsink needed, link it to the session */ pad = gst_element_get_static_pad (priv->appsink[i], "sink"); @@ -2540,11 +2435,6 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) * sink used for RTCP data, not the RTP data. */ if (i == 1) g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); - } else { - /* else only udpsink needed, link it to the session */ - pad = gst_element_get_static_pad (priv->udpsink[i], "sink"); - gst_pad_link (priv->send_src[i], pad); - gst_object_unref (pad); } /* check if we need to set to a special state */ @@ -2561,14 +2451,6 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) gst_element_set_state (priv->tee[i], state); } } - - return TRUE; - - /* ERRORS */ -no_udp_protocol: - { - return FALSE; - } } /* must be called with lock */ @@ -2663,6 +2545,153 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) } } +static gboolean +create_mcast_part_for_transport (GstRTSPStream * stream, + const GstRTSPTransport * tr) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GInetAddress *inetaddr; + GSocketFamily family; + GstRTSPAddress *mcast_addr; + GstElement **mcast_udpsrc; + GSocket *rtp_socket = NULL; + GSocket *rtcp_socket = NULL; + GSocketAddress *rtp_sockaddr = NULL; + GSocketAddress *rtcp_sockaddr = NULL; + GError *error = NULL; + const gchar *multicast_iface = priv->multicast_iface; + + /* Check if it's a ipv4 or ipv6 transport */ + inetaddr = g_inet_address_new_from_string (tr->destination); + family = g_inet_address_get_family (inetaddr); + g_object_unref (inetaddr); + + /* Select fields corresponding to the family */ + if (family == G_SOCKET_FAMILY_IPV4) { + mcast_addr = priv->mcast_addr_v4; + mcast_udpsrc = priv->mcast_udpsrc_v4; + } else { + mcast_addr = priv->mcast_addr_v6; + mcast_udpsrc = priv->mcast_udpsrc_v6; + } + + /* We support only one mcast group per family, make sure this transport + * matches it. */ + if (!mcast_addr) + goto no_addr; + + if (!g_str_equal (tr->destination, mcast_addr->address) || + tr->port.min != mcast_addr->port || + tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 || + tr->ttl != mcast_addr->ttl) + goto wrong_addr; + + if (mcast_udpsrc[0]) { + /* We already created elements for this family. Since we support only one + * mcast group per family, there is nothing more to do here. */ + g_assert (mcast_udpsrc[1]); + g_assert (priv->mcast_udpqueue[0]); + g_assert (priv->mcast_udpqueue[1]); + g_assert (priv->mcast_udpsink[0]); + g_assert (priv->mcast_udpsink[1]); + return TRUE; + } + + g_assert (!mcast_udpsrc[1]); + + /* Create RTP/RTCP sockets and bind them on ANY with mcast ports */ + rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM, + G_SOCKET_PROTOCOL_UDP, &error); + if (!rtp_socket) + goto socket_error; + g_socket_set_multicast_loopback (rtp_socket, FALSE); + + rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM, + G_SOCKET_PROTOCOL_UDP, &error); + if (!rtcp_socket) + goto socket_error; + g_socket_set_multicast_loopback (rtcp_socket, FALSE); + + inetaddr = g_inet_address_new_any (family); + rtp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port); + rtcp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port + 1); + g_object_unref (inetaddr); + + if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, &error)) + goto socket_error; + + if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, &error)) + goto socket_error; + + g_object_unref (rtp_sockaddr); + g_object_unref (rtcp_sockaddr); + + /* Add receiver part */ + create_and_configure_udpsources (mcast_udpsrc, rtp_socket, rtcp_socket); + if (priv->srcpad) { + plug_src (stream, priv->joined_bin, mcast_udpsrc[0], priv->funnel[0]); + gst_element_sync_state_with_parent (mcast_udpsrc[0]); + } + plug_src (stream, priv->joined_bin, mcast_udpsrc[1], priv->funnel[1]); + gst_element_sync_state_with_parent (mcast_udpsrc[1]); + + /* Add sender part, could already have been created for the other family. */ + if (!priv->mcast_udpsink[0]) { + g_assert (!priv->mcast_udpsink[1]); + g_assert (!priv->mcast_udpqueue[0]); + g_assert (!priv->mcast_udpqueue[1]); + + create_and_configure_udpsinks (stream, priv->mcast_udpsink, + multicast_iface); + set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket, + family); + + if (priv->sinkpad) { + plug_sink (priv->joined_bin, priv->tee[0], priv->mcast_udpsink[0], + &priv->mcast_udpqueue[0]); + gst_element_sync_state_with_parent (priv->mcast_udpsink[0]); + gst_element_sync_state_with_parent (priv->mcast_udpqueue[0]); + } + plug_sink (priv->joined_bin, priv->tee[1], priv->mcast_udpsink[1], + &priv->mcast_udpqueue[1]); + gst_element_sync_state_with_parent (priv->mcast_udpsink[1]); + gst_element_sync_state_with_parent (priv->mcast_udpqueue[1]); + } else { + g_assert (priv->mcast_udpsink[1]); + g_assert (priv->mcast_udpqueue[0]); + g_assert (priv->mcast_udpqueue[1]); + + set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket, + family); + } + + return TRUE; + +no_addr: + { + GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address " + "has been reserved"); + return FALSE; + } +wrong_addr: + { + GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match " + "the reserved address"); + return FALSE; + } +socket_error: + { + GST_ERROR_OBJECT (stream, "Error creating and binding mcast socket: %s", + error->message); + g_clear_object (&rtp_socket); + g_clear_object (&rtcp_socket); + g_clear_object (&rtp_sockaddr); + g_clear_object (&rtcp_sockaddr); + g_clear_error (&error); + return FALSE; + } +} + /** * gst_rtsp_stream_join_bin: * @stream: a #GstRTSPStream @@ -2701,6 +2730,9 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, GST_INFO ("stream %p joining bin as session %u", stream, idx); + if (!alloc_ports (stream)) + goto no_ports; + if (priv->profiles & GST_RTSP_PROFILE_SAVP || priv->profiles & GST_RTSP_PROFILE_SAVPF) { /* For SRTP */ @@ -2776,9 +2808,7 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, g_signal_connect (priv->session, "on-sender-ssrc-active", (GCallback) on_sender_ssrc_active, stream); - if (!create_sender_part (stream, bin, state)) - goto no_udp_protocol; - + create_sender_part (stream, bin, state); create_receiver_part (stream, bin, state); if (priv->srcpad) { @@ -2798,71 +2828,17 @@ was_joined: g_mutex_unlock (&priv->lock); return TRUE; } -link_failed: +no_ports: { - GST_WARNING ("failed to link stream %u", idx); - gst_object_unref (priv->send_rtp_sink); - priv->send_rtp_sink = NULL; g_mutex_unlock (&priv->lock); + GST_WARNING ("failed to allocate ports %u", idx); return FALSE; } -no_udp_protocol: +link_failed: { - GST_WARNING ("failed to allocate ports %u", idx); + GST_WARNING ("failed to link stream %u", idx); gst_object_unref (priv->send_rtp_sink); priv->send_rtp_sink = NULL; - gst_object_unref (priv->send_src[0]); - priv->send_src[0] = NULL; - gst_object_unref (priv->send_src[1]); - priv->send_src[1] = NULL; - gst_object_unref (priv->recv_sink[0]); - priv->recv_sink[0] = NULL; - gst_object_unref (priv->recv_sink[1]); - priv->recv_sink[1] = NULL; - if (priv->udpsink[0]) - gst_element_set_state (priv->udpsink[0], GST_STATE_NULL); - if (priv->udpsink[1]) - gst_element_set_state (priv->udpsink[1], GST_STATE_NULL); - if (priv->udpsrc_v4[0]) { - gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v4[0]); - priv->udpsrc_v4[0] = NULL; - } - if (priv->udpsrc_v4[1]) { - gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v4[1]); - priv->udpsrc_v4[1] = NULL; - } - if (priv->udpsrc_mcast_v4[0]) { - gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_mcast_v4[0]); - priv->udpsrc_mcast_v4[0] = NULL; - } - if (priv->udpsrc_mcast_v4[1]) { - gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_mcast_v4[1]); - priv->udpsrc_mcast_v4[1] = NULL; - } - if (priv->udpsrc_v6[0]) { - gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v6[0]); - priv->udpsrc_v6[0] = NULL; - } - if (priv->udpsrc_v6[1]) { - gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v6[1]); - priv->udpsrc_v6[1] = NULL; - } - if (priv->udpsrc_mcast_v6[0]) { - gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_mcast_v6[0]); - priv->udpsrc_mcast_v6[0] = NULL; - } - if (priv->udpsrc_mcast_v6[1]) { - gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_mcast_v6[1]); - priv->udpsrc_mcast_v6[1] = NULL; - } g_mutex_unlock (&priv->lock); return FALSE; } @@ -2936,17 +2912,22 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, } for (i = 0; i < 2; i++) { + clear_element (bin, &priv->udpsrc_v4[i]); + clear_element (bin, &priv->udpsrc_v6[i]); + clear_element (bin, &priv->udpqueue[i]); clear_element (bin, &priv->udpsink[i]); - clear_element (bin, &priv->appsink[i]); + + clear_element (bin, &priv->mcast_udpsrc_v4[i]); + clear_element (bin, &priv->mcast_udpsrc_v6[i]); + clear_element (bin, &priv->mcast_udpqueue[i]); + clear_element (bin, &priv->mcast_udpsink[i]); + + clear_element (bin, &priv->appsrc[i]); clear_element (bin, &priv->appqueue[i]); - clear_element (bin, &priv->udpqueue[i]); + clear_element (bin, &priv->appsink[i]); + clear_element (bin, &priv->tee[i]); clear_element (bin, &priv->funnel[i]); - clear_element (bin, &priv->appsrc[i]); - clear_element (bin, &priv->udpsrc_v4[i]); - clear_element (bin, &priv->udpsrc_v6[i]); - clear_element (bin, &priv->udpsrc_mcast_v4[i]); - clear_element (bin, &priv->udpsrc_mcast_v6[i]); if (priv->sinkpad || i == 1) { gst_element_release_request_pad (rtpbin, priv->recv_sink[i]); @@ -3332,6 +3313,18 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, switch (tr->lower_transport) { case GST_RTSP_LOWER_TRANS_UDP_MCAST: + { + if (add) { + if (!create_mcast_part_for_transport (stream, tr)) + goto mcast_error; + priv->transports = g_list_prepend (priv->transports, trans); + } else { + priv->transports = g_list_remove (priv->transports, trans); + /* FIXME: Check if there are remaining mcast transports, and destroy + * mcast part if its now unused */ + } + break; + } case GST_RTSP_LOWER_TRANS_UDP: { gchar *dest; @@ -3393,6 +3386,10 @@ unknown_transport: GST_INFO ("Unknown transport %d", tr->lower_transport); return FALSE; } +mcast_error: + { + return FALSE; + } } diff --git a/tests/check/gst/stream.c b/tests/check/gst/stream.c index 26c2915..5a21ba4 100644 --- a/tests/check/gst/stream.c +++ b/tests/check/gst/stream.c @@ -33,7 +33,6 @@ GST_START_TEST (test_get_sockets) GSocket *socket; gboolean have_ipv4; gboolean have_ipv6; - GstRTSPTransport *tr; srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC); fail_unless (srcpad != NULL); @@ -62,11 +61,6 @@ GST_START_TEST (test_get_sockets) fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); - gst_rtsp_transport_new (&tr); - tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, - G_SOCKET_FAMILY_IPV4, tr, FALSE)); - socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4); have_ipv4 = (socket != NULL); if (have_ipv4) { @@ -102,7 +96,6 @@ GST_START_TEST (test_get_sockets) /* check that at least one family is available */ fail_unless (have_ipv4 || have_ipv6); - gst_rtsp_transport_free (tr); g_object_unref (pool); fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin)); @@ -121,7 +114,6 @@ GST_START_TEST (test_allocate_udp_ports_fail) GstBin *bin; GstElement *rtpbin; GstRTSPAddressPool *pool; - GstRTSPTransport *tr; srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC); fail_unless (srcpad != NULL); @@ -143,14 +135,8 @@ GST_START_TEST (test_allocate_udp_ports_fail) "192.168.1.1", 6000, 6001, 0)); gst_rtsp_stream_set_address_pool (stream, pool); - fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); + fail_if (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); - gst_rtsp_transport_new (&tr); - tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP; - fail_if (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4, - tr, FALSE)); - - gst_rtsp_transport_free (tr); g_object_unref (pool); fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin)); gst_object_unref (bin); @@ -257,13 +243,6 @@ GST_START_TEST (test_multicast_address_and_unicast_udp) fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); - gst_rtsp_transport_new (&tr); - /* unicast udp */ - tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, - G_SOCKET_FAMILY_IPV4, tr, FALSE)); - - gst_rtsp_transport_free (tr); g_object_unref (pool); fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin)); gst_object_unref (bin); @@ -280,7 +259,6 @@ GST_START_TEST (test_allocate_udp_ports_multicast) GstBin *bin; GstElement *rtpbin; GstRTSPAddressPool *pool; - GstRTSPTransport *tr; GstRTSPAddress *addr; srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC); @@ -308,12 +286,6 @@ GST_START_TEST (test_allocate_udp_ports_multicast) fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); - /* allocate udp multicast ports for IPv4 */ - gst_rtsp_transport_new (&tr); - tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, - G_SOCKET_FAMILY_IPV4, tr, FALSE)); - /* check the multicast address and ports for IPv4 */ addr = gst_rtsp_stream_get_multicast_address (stream, G_SOCKET_FAMILY_IPV4); fail_unless (addr != NULL); @@ -322,10 +294,6 @@ GST_START_TEST (test_allocate_udp_ports_multicast) fail_unless_equals_int (addr->n_ports, 2); gst_rtsp_address_free (addr); - /* allocate udp multicast ports for IPv6 */ - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, - G_SOCKET_FAMILY_IPV6, tr, FALSE)); - /* check the multicast address and ports for IPv6 */ addr = gst_rtsp_stream_get_multicast_address (stream, G_SOCKET_FAMILY_IPV6); fail_unless (addr != NULL); @@ -334,7 +302,6 @@ GST_START_TEST (test_allocate_udp_ports_multicast) fail_unless_equals_int (addr->n_ports, 2); gst_rtsp_address_free (addr); - gst_rtsp_transport_free (tr); g_object_unref (pool); fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin)); gst_object_unref (bin); @@ -351,7 +318,6 @@ GST_START_TEST (test_allocate_udp_ports_client_settings) GstBin *bin; GstElement *rtpbin; GstRTSPAddressPool *pool; - GstRTSPTransport *tr; GstRTSPAddress *addr; srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC); @@ -384,14 +350,10 @@ GST_START_TEST (test_allocate_udp_ports_client_settings) fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); - /* client transport settings for IPv4 */ - gst_rtsp_transport_new (&tr); - tr->destination = g_strdup ("233.252.0.2"); - tr->port.min = 6002; - tr->port.max = 6003; - tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, - G_SOCKET_FAMILY_IPV4, tr, FALSE)); + /* Reserve IPV4 mcast address */ + addr = gst_rtsp_stream_reserve_address (stream, "233.252.0.2", 6002, 2, 1); + fail_unless (addr != NULL); + gst_rtsp_address_free (addr); /* verify that the multicast address and ports correspond to the requested client * transport information for IPv4 */ @@ -402,13 +364,10 @@ GST_START_TEST (test_allocate_udp_ports_client_settings) fail_unless_equals_int (addr->n_ports, 2); gst_rtsp_address_free (addr); - /* client transport settings for IPv6 */ - g_free (tr->destination); - tr->destination = g_strdup ("FF11:DB8::1"); - tr->port.min = 6006; - tr->port.max = 6007; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, - G_SOCKET_FAMILY_IPV6, tr, FALSE)); + /* Reserve IPV6 mcast address */ + addr = gst_rtsp_stream_reserve_address (stream, "FF11:DB8::1", 6006, 2, 1); + fail_unless (addr != NULL); + gst_rtsp_address_free (addr); /* verify that the multicast address and ports correspond to the requested client * transport information for IPv6 */ @@ -419,7 +378,6 @@ GST_START_TEST (test_allocate_udp_ports_client_settings) fail_unless_equals_int (addr->n_ports, 2); gst_rtsp_address_free (addr); - gst_rtsp_transport_free (tr); g_object_unref (pool); fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin)); gst_object_unref (bin);