X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtsp-server%2Frtsp-stream.c;h=5773fa60c91ebe0d8a5c105a37b8f4a85d44c10b;hb=ca855abae1a38c4278f85b93f84bf2b0c6a8a4d9;hp=8b255eb7852121b8f2c6b80311e2f96c3bf8189f;hpb=957a4a65c649293717634602725e21ffc2fe84ea;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 8b255eb..5773fa6 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -1,5 +1,7 @@ /* GStreamer * Copyright (C) 2008 Wim Taymans + * Copyright (C) 2015 Centricular Ltd + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -53,6 +55,8 @@ #include #include +#include + #include "rtsp-stream.h" #define GST_RTSP_STREAM_GET_PRIVATE(obj) \ @@ -62,10 +66,15 @@ struct _GstRTSPStreamPrivate { GMutex lock; guint idx; - GstPad *srcpad; + /* Only one pad is ever set */ + GstPad *srcpad, *sinkpad; GstElement *payloader; guint buffer_size; - gboolean is_joined; + GstBin *joined_bin; + + /* TRUE if this stream is running on + * the client side of an RTSP link (for RECORD) */ + gboolean client_side; gchar *control; GstRTSPProfile profiles; @@ -73,44 +82,56 @@ struct _GstRTSPStreamPrivate /* pads on the rtpbin */ GstPad *send_rtp_sink; + GstPad *recv_rtp_src; GstPad *recv_sink[2]; GstPad *send_src[2]; /* the RTPSession object */ GObject *session; - /* sinks used for sending and receiving RTP and RTCP over ipv4, they share - * sockets */ - GstElement *udpsrc_v4[2]; + /* SRTP encoder/decoder */ + GstElement *srtpenc; + GstElement *srtpdec; + GHashTable *keys; - /* sinks used for sending and receiving RTP and RTCP over ipv6, they share - * sockets */ + /* for UDP unicast */ + GstElement *udpsrc_v4[2]; GstElement *udpsrc_v6[2]; - + GstElement *udpqueue[2]; GstElement *udpsink[2]; + /* for UDP multicast */ + GstElement *mcast_udpsrc_v4[2]; + GstElement *mcast_udpsrc_v6[2]; + GstElement *mcast_udpqueue[2]; + GstElement *mcast_udpsink[2]; + /* for TCP transport */ GstElement *appsrc[2]; + GstClockTime appsrc_base_time[2]; GstElement *appqueue[2]; GstElement *appsink[2]; GstElement *tee[2]; GstElement *funnel[2]; - /* server ports for sending/receiving over ipv4 */ - GstRTSPRange server_port_v4; - GstRTSPAddress *server_addr_v4; - gboolean have_ipv4; + /* retransmission */ + GstElement *rtxsend; + guint rtx_pt; + GstClockTime rtx_time; + + /* pool used to manage unicast and multicast addresses */ + GstRTSPAddressPool *pool; - /* server ports for sending/receiving over ipv6 */ - GstRTSPRange server_port_v6; + /* unicast server addr/port */ + GstRTSPAddress *server_addr_v4; GstRTSPAddress *server_addr_v6; - gboolean have_ipv6; /* multicast addresses */ - GstRTSPAddressPool *pool; - GstRTSPAddress *addr_v4; - GstRTSPAddress *addr_v6; + GstRTSPAddress *mcast_addr_v4; + GstRTSPAddress *mcast_addr_v6; + + gchar *multicast_iface; /* the caps of the stream */ gulong caps_sig; @@ -119,12 +140,22 @@ struct _GstRTSPStreamPrivate /* transports we stream to */ guint n_active; GList *transports; + guint transports_cookie; + GList *tr_cache_rtp; + GList *tr_cache_rtcp; + guint tr_cache_cookie_rtp; + guint tr_cache_cookie_rtcp; gint dscp_qos; /* stream blocking */ gulong blocked_id; gboolean blocking; + + /* pt->caps map for RECORD streams */ + GHashTable *ptmap; + + GstRTSPPublishClockMode publish_clock_mode; }; #define DEFAULT_CONTROL NULL @@ -141,6 +172,13 @@ enum PROP_LAST }; +enum +{ + SIGNAL_NEW_RTP_ENCODER, + SIGNAL_NEW_RTCP_ENCODER, + SIGNAL_LAST +}; + GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug); #define GST_CAT_DEFAULT rtsp_stream_debug @@ -153,6 +191,8 @@ static void gst_rtsp_stream_set_property (GObject * object, guint propid, static void gst_rtsp_stream_finalize (GObject * obj); +static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 }; + G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT); static void @@ -183,6 +223,16 @@ gst_rtsp_stream_class_init (GstRTSPStreamClass * klass) "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS, DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] = + g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_ELEMENT); + + gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] = + g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_ELEMENT); + GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream"); ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream"); @@ -201,8 +251,14 @@ gst_rtsp_stream_init (GstRTSPStream * stream) priv->control = g_strdup (DEFAULT_CONTROL); priv->profiles = DEFAULT_PROFILES; priv->protocols = DEFAULT_PROTOCOLS; + priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK; g_mutex_init (&priv->lock); + + priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal, + NULL, (GDestroyNotify) gst_caps_unref); + priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL, + (GDestroyNotify) gst_caps_unref); } static void @@ -217,23 +273,34 @@ gst_rtsp_stream_finalize (GObject * obj) GST_DEBUG ("finalize stream %p", stream); /* we really need to be unjoined now */ - g_return_if_fail (!priv->is_joined); + g_return_if_fail (priv->joined_bin == NULL); - if (priv->addr_v4) - gst_rtsp_address_free (priv->addr_v4); - if (priv->addr_v6) - gst_rtsp_address_free (priv->addr_v6); + if (priv->mcast_addr_v4) + gst_rtsp_address_free (priv->mcast_addr_v4); + if (priv->mcast_addr_v6) + gst_rtsp_address_free (priv->mcast_addr_v6); if (priv->server_addr_v4) gst_rtsp_address_free (priv->server_addr_v4); if (priv->server_addr_v6) gst_rtsp_address_free (priv->server_addr_v6); if (priv->pool) g_object_unref (priv->pool); + if (priv->rtxsend) + g_object_unref (priv->rtxsend); + + g_free (priv->multicast_iface); + gst_object_unref (priv->payloader); - gst_object_unref (priv->srcpad); + if (priv->srcpad) + gst_object_unref (priv->srcpad); + if (priv->sinkpad) + gst_object_unref (priv->sinkpad); g_free (priv->control); g_mutex_clear (&priv->lock); + g_hash_table_unref (priv->keys); + g_hash_table_destroy (priv->ptmap); + G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj); } @@ -282,29 +349,32 @@ gst_rtsp_stream_set_property (GObject * object, guint propid, /** * gst_rtsp_stream_new: * @idx: an index - * @srcpad: a #GstPad + * @pad: a #GstPad * @payloader: a #GstElement * * Create a new media stream with index @idx that handles RTP data on - * @srcpad and has a payloader element @payloader. + * @pad and has a payloader element @payloader if @pad is a source pad + * or a depayloader element @payloader if @pad is a sink pad. * - * Returns: a new #GstRTSPStream + * Returns: (transfer full): a new #GstRTSPStream */ GstRTSPStream * -gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad) +gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad) { GstRTSPStreamPrivate *priv; GstRTSPStream *stream; g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL); - g_return_val_if_fail (GST_IS_PAD (srcpad), NULL); - g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL); + g_return_val_if_fail (GST_IS_PAD (pad), NULL); stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL); priv = stream->priv; priv->idx = idx; priv->payloader = gst_object_ref (payloader); - priv->srcpad = gst_object_ref (srcpad); + if (GST_PAD_IS_SRC (pad)) + priv->srcpad = gst_object_ref (pad); + else + priv->sinkpad = gst_object_ref (pad); return stream; } @@ -361,16 +431,38 @@ gst_rtsp_stream_get_srcpad (GstRTSPStream * stream) { g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + if (!stream->priv->srcpad) + return NULL; + return gst_object_ref (stream->priv->srcpad); } /** + * gst_rtsp_stream_get_sinkpad: + * @stream: a #GstRTSPStream + * + * Get the sinkpad associated with @stream. + * + * Returns: (transfer full): the sinkpad. Unref after usage. + */ +GstPad * +gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream) +{ + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + + if (!stream->priv->sinkpad) + return NULL; + + return gst_object_ref (stream->priv->sinkpad); +} + +/** * gst_rtsp_stream_get_control: * @stream: a #GstRTSPStream * * Get the control string to identify this stream. * - * Returns: (transfer full): the control string. free after usage. + * Returns: (transfer full): the control string. g_free() after usage. */ gchar * gst_rtsp_stream_get_control (GstRTSPStream * stream) @@ -493,22 +585,18 @@ gst_rtsp_stream_get_mtu (GstRTSPStream * stream) /* Update the dscp qos property on the udp sinks */ static void -update_dscp_qos (GstRTSPStream * stream) +update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2]) { GstRTSPStreamPrivate *priv; - g_return_if_fail (GST_IS_RTSP_STREAM (stream)); - priv = stream->priv; - if (priv->udpsink[0]) { - g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos, - NULL); + if (udpsink[0]) { + g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL); } - if (priv->udpsink[1]) { - g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos, - NULL); + if (udpsink[1]) { + g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL); } } @@ -537,7 +625,7 @@ gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos) priv->dscp_qos = dscp_qos; - update_dscp_qos (stream); + update_dscp_qos (stream, priv->udpsink); } /** @@ -563,7 +651,7 @@ gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream) /** * gst_rtsp_stream_is_transport_supported: * @stream: a #GstRTSPStream - * @transport: a #GstRTSPTransport + * @transport: (transfer none): a #GstRTSPTransport * * Check if @transport can be handled by stream * @@ -597,16 +685,19 @@ gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream, unsupported_transmode: { GST_DEBUG ("unsupported transport mode %d", transport->trans); + g_mutex_unlock (&priv->lock); return FALSE; } unsupported_profile: { GST_DEBUG ("unsupported profile %d", transport->profile); + g_mutex_unlock (&priv->lock); return FALSE; } unsupported_ltrans: { GST_DEBUG ("unsupported lower transport %d", transport->lower_transport); + g_mutex_unlock (&priv->lock); return FALSE; } } @@ -708,7 +799,7 @@ gst_rtsp_stream_get_protocols (GstRTSPStream * stream) /** * gst_rtsp_stream_set_address_pool: * @stream: a #GstRTSPStream - * @pool: a #GstRTSPAddressPool + * @pool: (transfer none): a #GstRTSPAddressPool * * configure @pool to be used as the address pool of @stream. */ @@ -764,17 +855,67 @@ gst_rtsp_stream_get_address_pool (GstRTSPStream * stream) } /** - * gst_rtsp_stream_get_multicast_address: + * gst_rtsp_stream_set_multicast_iface: + * @stream: a #GstRTSPStream + * @multicast_iface: (transfer none): a multicast interface + * + * configure @multicast_iface to be used for @stream. + */ +void +gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream, + const gchar * multicast_iface) +{ + GstRTSPStreamPrivate *priv; + gchar *old; + + g_return_if_fail (GST_IS_RTSP_STREAM (stream)); + + priv = stream->priv; + + GST_LOG_OBJECT (stream, "set multicast iface %s", + GST_STR_NULL (multicast_iface)); + + g_mutex_lock (&priv->lock); + if ((old = priv->multicast_iface) != multicast_iface) + priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL; + else + old = NULL; + g_mutex_unlock (&priv->lock); + + if (old) + g_free (old); +} + +/** + * gst_rtsp_stream_get_multicast_iface: * @stream: a #GstRTSPStream - * @family: the #GSocketFamily * - * Get the multicast address of @stream for @family. + * Get the multicast interface used for @stream. * - * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be - * allocated. gst_rtsp_address_free() after usage. + * Returns: (transfer full): the multicast interface for @stream. g_free() after + * usage. */ -GstRTSPAddress * -gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, +gchar * +gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv; + gchar *result; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + + priv = stream->priv; + + g_mutex_lock (&priv->lock); + if ((result = priv->multicast_iface)) + result = g_strdup (result); + g_mutex_unlock (&priv->lock); + + return result; +} + + +static GstRTSPAddress * +gst_rtsp_stream_get_multicast_address_locked (GstRTSPStream * stream, GSocketFamily family) { GstRTSPStreamPrivate *priv; @@ -782,19 +923,16 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, GstRTSPAddress **addrp; GstRTSPAddressFlags flags; - g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); - priv = stream->priv; if (family == G_SOCKET_FAMILY_IPV6) { flags = GST_RTSP_ADDRESS_FLAG_IPV6; - addrp = &priv->addr_v4; + addrp = &priv->mcast_addr_v6; } else { flags = GST_RTSP_ADDRESS_FLAG_IPV4; - addrp = &priv->addr_v6; + addrp = &priv->mcast_addr_v4; } - g_mutex_lock (&priv->lock); if (*addrp == NULL) { if (priv->pool == NULL) goto no_pool; @@ -804,9 +942,14 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2); if (*addrp == NULL) goto no_address; + + /* FIXME: Also reserve the same port with unicast ANY address, since that's + * where we are going to bind our socket. Probably loop until we find a port + * available in both mcast and unicast pools. Maybe GstRTSPAddressPool + * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and + * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */ } result = gst_rtsp_address_copy (*addrp); - g_mutex_unlock (&priv->lock); return result; @@ -814,18 +957,44 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, no_pool: { GST_ERROR_OBJECT (stream, "no address pool specified"); - g_mutex_unlock (&priv->lock); return NULL; } no_address: { GST_ERROR_OBJECT (stream, "failed to acquire address from pool"); - g_mutex_unlock (&priv->lock); return NULL; } } /** + * gst_rtsp_stream_get_multicast_address: + * @stream: a #GstRTSPStream + * @family: the #GSocketFamily + * + * Get the multicast address of @stream for @family. The original + * #GstRTSPAddress is cached and copy is returned, so freeing the return value + * won't release the address from the pool. + * + * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream + * or %NULL when no address could be allocated. gst_rtsp_address_free() + * after usage. + */ +GstRTSPAddress * +gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, + GSocketFamily family) +{ + GstRTSPAddress *result; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + + g_mutex_lock (&stream->priv->lock); + result = gst_rtsp_stream_get_multicast_address_locked (stream, family); + g_mutex_unlock (&stream->priv->lock); + + return result; +} + +/** * gst_rtsp_stream_reserve_address: * @stream: a #GstRTSPStream * @address: an address @@ -833,10 +1002,12 @@ no_address: * @n_ports: n_ports * @ttl: a TTL * - * Reserve @address and @port as the address and port of @stream. + * Reserve @address and @port as the address and port of @stream. The original + * #GstRTSPAddress is cached and copy is returned, so freeing the return value + * won't release the address from the pool. * - * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be - * reserved. gst_rtsp_address_free() after usage. + * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when + * the address could be reserved. gst_rtsp_address_free() after usage. */ GstRTSPAddress * gst_rtsp_stream_reserve_address (GstRTSPStream * stream, @@ -866,9 +1037,9 @@ gst_rtsp_stream_reserve_address (GstRTSPStream * stream, } if (family == G_SOCKET_FAMILY_IPV6) - addrp = &priv->addr_v4; + addrp = &priv->mcast_addr_v6; else - addrp = &priv->addr_v6; + addrp = &priv->mcast_addr_v4; g_mutex_lock (&priv->lock); if (*addrp == NULL) { @@ -881,6 +1052,9 @@ gst_rtsp_stream_reserve_address (GstRTSPStream * stream, port, n_ports, ttl, addrp); if (res != GST_RTSP_ADDRESS_POOL_OK) goto no_address; + + /* FIXME: Also reserve the same port with unicast ANY address, since that's + * where we are going to bind our socket. */ } else { if (strcmp ((*addrp)->address, address) || (*addrp)->port != port || (*addrp)->n_ports != n_ports || @@ -915,15 +1089,130 @@ different_address: } } +/* must be called with lock */ +static void +set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket, + GSocket * rtcp_socket, GSocketFamily family) +{ + const gchar *multisink_socket; + + if (family == G_SOCKET_FAMILY_IPV6) + multisink_socket = "socket-v6"; + else + multisink_socket = "socket"; + + g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL); + g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL); +} + static gboolean -alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size, - GSocketFamily family, GstElement * udpsrc_out[2], - GstElement * udpsink_out[2], GstRTSPRange * server_port_out, - GstRTSPAddress ** server_addr_out) +create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2]) { - GstStateChangeReturn ret; - GstElement *udpsrc0, *udpsrc1; + GstRTSPStreamPrivate *priv = stream->priv; GstElement *udpsink0, *udpsink1; + + udpsink0 = gst_element_factory_make ("multiudpsink", NULL); + udpsink1 = gst_element_factory_make ("multiudpsink", NULL); + + if (!udpsink0 || !udpsink1) + goto no_udp_protocol; + + /* configure sinks */ + + g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL); + g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL); + + g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL); + g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL); + + g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL); + + g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL); + /* Needs to be async for RECORD streams, otherwise we will never go to + * PLAYING because the sinks will wait for data while the udpsrc can't + * provide data with timestamps in PAUSED. */ + if (priv->sinkpad) + g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL); + g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL); + + /* join multicast group when adding clients, so we'll start receiving from it. + * We cannot rely on the udpsrc to join the group since its socket is always a + * local unicast one. */ + g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL); + g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL); + + g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL); + g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL); + + udpsink[0] = udpsink0; + udpsink[1] = udpsink1; + + /* update the dscp qos field in the sinks */ + update_dscp_qos (stream, udpsink); + + return TRUE; + + /* ERRORS */ +no_udp_protocol: + { + return FALSE; + } +} + +/* must be called with lock */ +static gboolean +create_and_configure_udpsources (GstElement * udpsrc_out[2], + GSocket * rtp_socket, GSocket * rtcp_socket) +{ + GstStateChangeReturn ret; + + udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL); + udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL); + + if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL) + goto error; + + g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL); + g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL); + + /* The udpsrc cannot do the join because its socket is always a local unicast + * one. The udpsink sharing the same socket will do it for us. */ + g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL); + g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL); + + g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL); + g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL); + + ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY); + if (ret == GST_STATE_CHANGE_FAILURE) + goto error; + ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY); + if (ret == GST_STATE_CHANGE_FAILURE) + goto error; + + return TRUE; + + /* ERRORS */ +error: + { + if (udpsrc_out[0]) { + gst_element_set_state (udpsrc_out[0], GST_STATE_NULL); + g_clear_object (&udpsrc_out[0]); + } + if (udpsrc_out[1]) { + gst_element_set_state (udpsrc_out[1], GST_STATE_NULL); + g_clear_object (&udpsrc_out[1]); + } + return FALSE; + } +} + +static gboolean +alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, + GstElement * udpsrc_out[2], GstElement * udpsink_out[2], + GstRTSPAddress ** server_addr_out, gboolean multicast) +{ + GstRTSPStreamPrivate *priv = stream->priv; GSocket *rtp_socket = NULL; GSocket *rtcp_socket; gint tmp_rtp, tmp_rtcp; @@ -932,19 +1221,18 @@ alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size, GList *rejected_addresses = NULL; GstRTSPAddress *addr = NULL; GInetAddress *inetaddr = NULL; + gchar *addr_str; GSocketAddress *rtp_sockaddr = NULL; GSocketAddress *rtcp_sockaddr = NULL; - const gchar *multisink_socket; + GstRTSPAddressPool *pool; - if (family == G_SOCKET_FAMILY_IPV6) - multisink_socket = "socket-v6"; - else - multisink_socket = "socket"; + g_assert (!udpsrc_out[0]); + g_assert (!udpsrc_out[1]); + g_assert ((!udpsink_out[0] && !udpsink_out[1]) || + (udpsink_out[0] && udpsink_out[1])); + g_assert (*server_addr_out == NULL); - udpsrc0 = NULL; - udpsrc1 = NULL; - udpsink0 = NULL; - udpsink1 = NULL; + pool = priv->pool; count = 0; /* Start with random port */ @@ -954,9 +1242,7 @@ alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size, G_SOCKET_PROTOCOL_UDP, NULL); if (!rtcp_socket) goto no_udp_protocol; - - if (*server_addr_out) - gst_rtsp_address_free (*server_addr_out); + g_socket_set_multicast_loopback (rtcp_socket, FALSE); /* try to allocate 2 UDP ports, the RTP port should be an even * number and the RTCP port should be the next (uneven) port */ @@ -967,15 +1253,24 @@ again: G_SOCKET_PROTOCOL_UDP, NULL); if (!rtp_socket) goto no_udp_protocol; + g_socket_set_multicast_loopback (rtp_socket, FALSE); } - if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) { + if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) { GstRTSPAddressFlags flags; if (addr) rejected_addresses = g_list_prepend (rejected_addresses, addr); - flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST; + if (!pool) + goto no_ports; + + flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT; + if (multicast) + flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST; + else + flags |= GST_RTSP_ADDRESS_FLAG_UNICAST; + if (family == G_SOCKET_FAMILY_IPV6) flags |= GST_RTSP_ADDRESS_FLAG_IPV6; else @@ -1037,74 +1332,47 @@ again: } g_object_unref (rtcp_sockaddr); - g_clear_object (&inetaddr); + if (!addr) { + addr = g_slice_new0 (GstRTSPAddress); + addr->address = g_inet_address_to_string (inetaddr); + addr->port = tmp_rtp; + addr->n_ports = 2; + } - udpsrc0 = gst_element_factory_make ("udpsrc", NULL); - udpsrc1 = gst_element_factory_make ("udpsrc", NULL); + addr_str = addr->address; + g_clear_object (&inetaddr); - if (udpsrc0 == NULL || udpsrc1 == NULL) + if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) { goto no_udp_protocol; + } - g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL); - g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL); - - ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED); - if (ret == GST_STATE_CHANGE_FAILURE) - goto element_error; - ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED); - if (ret == GST_STATE_CHANGE_FAILURE) - goto element_error; - - /* all fine, do port check */ - g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL); - g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL); + g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL); + g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL); /* this should not happen... */ if (rtpport != tmp_rtp || rtcpport != tmp_rtcp) goto port_error; - if (udpsink_out[0]) - udpsink0 = udpsink_out[0]; - else - udpsink0 = gst_element_factory_make ("multiudpsink", NULL); - - if (!udpsink0) + /* This function is called twice (for v4 and v6) but we create only one pair + * of udpsinks. */ + if (!udpsink_out[0] + && !create_and_configure_udpsinks (stream, udpsink_out)) goto no_udp_protocol; - g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL); - - if (udpsink_out[1]) - udpsink1 = udpsink_out[1]; - else - udpsink1 = gst_element_factory_make ("multiudpsink", NULL); - - if (!udpsink1) - goto no_udp_protocol; - - g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL); + if (multicast) { + g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface", + priv->multicast_iface, NULL); + g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface", + priv->multicast_iface, NULL); - g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL); - g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL); + g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL); + g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL); + } - /* we keep these elements, we will further configure them when the - * client told us to really use the UDP ports. */ - udpsrc_out[0] = udpsrc0; - udpsrc_out[1] = udpsrc1; - udpsink_out[0] = udpsink0; - udpsink_out[1] = udpsink1; - server_port_out->min = rtpport; - server_port_out->max = rtcpport; + set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family); *server_addr_out = addr; + g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); g_object_unref (rtp_socket); @@ -1129,24 +1397,8 @@ socket_error: { goto cleanup; } -element_error: - { - goto cleanup; - } cleanup: { - if (udpsrc0) { - gst_element_set_state (udpsrc0, GST_STATE_NULL); - gst_object_unref (udpsrc0); - } - if (udpsrc1) { - gst_element_set_state (udpsrc1, GST_STATE_NULL); - gst_object_unref (udpsrc1); - } - if (udpsink0) { - gst_element_set_state (udpsink0, GST_STATE_NULL); - gst_object_unref (udpsink0); - } if (inetaddr) g_object_unref (inetaddr); g_list_free_full (rejected_addresses, @@ -1161,58 +1413,141 @@ cleanup: } } -/* must be called with lock */ -static gboolean -alloc_ports (GstRTSPStream * stream) +/** + * gst_rtsp_stream_allocate_udp_sockets: + * @stream: a #GstRTSPStream + * @family: protocol family + * @transport_method: transport method + * + * Allocates RTP and RTCP ports. + * + * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated. + * Deprecated: This function shouldn't have been made public + */ +gboolean +gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, + GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings) { - GstRTSPStreamPrivate *priv = stream->priv; - - priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size, - G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink, - &priv->server_port_v4, &priv->server_addr_v4); - - priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size, - G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink, - &priv->server_port_v6, &priv->server_addr_v6); - - return priv->have_ipv4 || priv->have_ipv6; + g_warn_if_reached (); + return FALSE; } /** - * gst_rtsp_stream_get_server_port: + * gst_rtsp_stream_set_client_side: * @stream: a #GstRTSPStream - * @server_port: (out): result server port - * @family: the port family to get - * - * Fill @server_port with the port pair used by the server. This function can - * only be called when @stream has been joined. + * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of + * an RTSP connection. + * + * Sets the #GstRTSPStream as a 'client side' stream - used for sending + * streams to an RTSP server via RECORD. This has the practical effect + * of changing which UDP port numbers are used when setting up the local + * side of the stream sending to be either the 'server' or 'client' pair + * of a configured UDP transport. */ void -gst_rtsp_stream_get_server_port (GstRTSPStream * stream, - GstRTSPRange * server_port, GSocketFamily family) +gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; - g_return_if_fail (priv->is_joined); - g_mutex_lock (&priv->lock); - if (family == G_SOCKET_FAMILY_IPV4) { - if (server_port) - *server_port = priv->server_port_v4; - } else { - if (server_port) - *server_port = priv->server_port_v6; - } + priv->client_side = client_side; g_mutex_unlock (&priv->lock); } /** - * gst_rtsp_stream_get_rtpsession: + * gst_rtsp_stream_is_client_side: * @stream: a #GstRTSPStream * - * Get the RTP session of this stream. + * See gst_rtsp_stream_set_client_side() + * + * Returns: TRUE if this #GstRTSPStream is client-side. + */ +gboolean +gst_rtsp_stream_is_client_side (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv; + gboolean ret; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + g_mutex_lock (&priv->lock); + ret = priv->client_side; + g_mutex_unlock (&priv->lock); + + return ret; +} + +/* must be called with lock */ +static gboolean +alloc_ports (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + gboolean ret = TRUE; + + if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) { + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE); + + ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE); + } + + /* FIXME: Maybe actually consider the return values? */ + if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) { + ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE); + + ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE); + } + + return ret; +} + +/** + * gst_rtsp_stream_get_server_port: + * @stream: a #GstRTSPStream + * @server_port: (out): result server port + * @family: the port family to get + * + * Fill @server_port with the port pair used by the server. This function can + * only be called when @stream has been joined. + */ +void +gst_rtsp_stream_get_server_port (GstRTSPStream * stream, + GstRTSPRange * server_port, GSocketFamily family) +{ + GstRTSPStreamPrivate *priv; + + g_return_if_fail (GST_IS_RTSP_STREAM (stream)); + priv = stream->priv; + g_return_if_fail (priv->joined_bin != NULL); + + g_mutex_lock (&priv->lock); + if (family == G_SOCKET_FAMILY_IPV4) { + if (server_port) { + server_port->min = priv->server_addr_v4->port; + server_port->max = + priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1; + } + } else { + if (server_port) { + server_port->min = priv->server_addr_v6->port; + server_port->max = + priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1; + } + } + g_mutex_unlock (&priv->lock); +} + +/** + * gst_rtsp_stream_get_rtpsession: + * @stream: a #GstRTSPStream + * + * Get the RTP session of this stream. * * Returns: (transfer full): The RTP session of this stream. Unref after usage. */ @@ -1235,6 +1570,32 @@ gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream) } /** + * gst_rtsp_stream_get_encoder: + * @stream: a #GstRTSPStream + * + * Get the SRTP encoder for this stream. + * + * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage. + */ +GstElement * +gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv; + GstElement *encoder; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + + priv = stream->priv; + + g_mutex_lock (&priv->lock); + if ((encoder = priv->srtpenc)) + g_object_ref (encoder); + g_mutex_unlock (&priv->lock); + + return encoder; +} + +/** * gst_rtsp_stream_get_ssrc: * @stream: a #GstRTSPStream * @ssrc: (out): result ssrc @@ -1249,7 +1610,7 @@ gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc) g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; - g_return_if_fail (priv->is_joined); + g_return_if_fail (priv->joined_bin != NULL); g_mutex_lock (&priv->lock); if (ssrc && priv->session) @@ -1257,6 +1618,139 @@ gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc) g_mutex_unlock (&priv->lock); } +/** + * gst_rtsp_stream_set_retransmission_time: + * @stream: a #GstRTSPStream + * @time: a #GstClockTime + * + * Set the amount of time to store retransmission packets. + */ +void +gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream, + GstClockTime time) +{ + GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time); + + g_mutex_lock (&stream->priv->lock); + stream->priv->rtx_time = time; + if (stream->priv->rtxsend) + g_object_set (stream->priv->rtxsend, "max-size-time", + GST_TIME_AS_MSECONDS (time), NULL); + g_mutex_unlock (&stream->priv->lock); +} + +/** + * gst_rtsp_stream_get_retransmission_time: + * @stream: a #GstRTSPStream + * + * Get the amount of time to store retransmission data. + * + * Returns: the amount of time to store retransmission data. + */ +GstClockTime +gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream) +{ + GstClockTime ret; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); + + g_mutex_lock (&stream->priv->lock); + ret = stream->priv->rtx_time; + g_mutex_unlock (&stream->priv->lock); + + return ret; +} + +/** + * gst_rtsp_stream_set_retransmission_pt: + * @stream: a #GstRTSPStream + * @rtx_pt: a #guint + * + * Set the payload type (pt) for retransmission of this stream. + */ +void +gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt) +{ + g_return_if_fail (GST_IS_RTSP_STREAM (stream)); + + GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt); + + g_mutex_lock (&stream->priv->lock); + stream->priv->rtx_pt = rtx_pt; + if (stream->priv->rtxsend) { + guint pt = gst_rtsp_stream_get_pt (stream); + gchar *pt_s = g_strdup_printf ("%d", pt); + GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map", + pt_s, G_TYPE_UINT, rtx_pt, NULL); + g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL); + g_free (pt_s); + gst_structure_free (rtx_pt_map); + } + g_mutex_unlock (&stream->priv->lock); +} + +/** + * gst_rtsp_stream_get_retransmission_pt: + * @stream: a #GstRTSPStream + * + * Get the payload-type used for retransmission of this stream + * + * Returns: The retransmission PT. + */ +guint +gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream) +{ + guint rtx_pt; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); + + g_mutex_lock (&stream->priv->lock); + rtx_pt = stream->priv->rtx_pt; + g_mutex_unlock (&stream->priv->lock); + + return rtx_pt; +} + +/** + * gst_rtsp_stream_set_buffer_size: + * @stream: a #GstRTSPStream + * @size: the buffer size + * + * Set the size of the UDP transmission buffer (in bytes) + * Needs to be set before the stream is joined to a bin. + * + * Since: 1.6 + */ +void +gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size) +{ + g_mutex_lock (&stream->priv->lock); + stream->priv->buffer_size = size; + g_mutex_unlock (&stream->priv->lock); +} + +/** + * gst_rtsp_stream_get_buffer_size: + * @stream: a #GstRTSPStream + * + * Get the size of the UDP transmission buffer (in bytes) + * + * Returns: the size of the UDP TX buffer + * + * Since: 1.6 + */ +guint +gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream) +{ + guint buffer_size; + + g_mutex_lock (&stream->priv->lock); + buffer_size = stream->priv->buffer_size; + g_mutex_unlock (&stream->priv->lock); + + return buffer_size; +} + /* executed from streaming thread */ static void caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream) @@ -1319,8 +1813,15 @@ find_transport (GstRTSPStream * stream, const gchar * rtcp_from) tr = gst_rtsp_stream_transport_get_transport (trans); - min = tr->client_port.min; - max = tr->client_port.max; + if (priv->client_side) { + /* In client side mode the 'destination' is the RTSP server, so send + * to those ports */ + min = tr->server_port.min; + max = tr->server_port.max; + } else { + min = tr->client_port.min; + max = tr->client_port.max; + } if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) { result = trans; @@ -1439,6 +1940,52 @@ on_timeout (GObject * session, GObject * source, GstRTSPStream * stream) } } +static void +on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream) +{ + GST_INFO ("%p: new sender source %p", stream, source); +#ifndef DUMP_STATS + { + GstStructure *stats; + g_object_get (source, "stats", &stats, NULL); + if (stats) { + dump_structure (stats); + gst_structure_free (stats); + } + } +#endif +} + +static void +on_sender_ssrc_active (GObject * session, GObject * source, + GstRTSPStream * stream) +{ +#ifndef DUMP_STATS + { + GstStructure *stats; + g_object_get (source, "stats", &stats, NULL); + if (stats) { + dump_structure (stats); + gst_structure_free (stats); + } + } +#endif +} + +static void +clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp) +{ + if (is_rtp) { + g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL); + g_list_free (priv->tr_cache_rtp); + priv->tr_cache_rtp = NULL; + } else { + g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL); + g_list_free (priv->tr_cache_rtcp); + priv->tr_cache_rtcp = NULL; + } +} + static GstFlowReturn handle_new_sample (GstAppSink * sink, gpointer user_data) { @@ -1447,6 +1994,7 @@ handle_new_sample (GstAppSink * sink, gpointer user_data) GstSample *sample; GstBuffer *buffer; GstRTSPStream *stream; + gboolean is_rtp; sample = gst_app_sink_pull_sample (sink); if (!sample) @@ -1456,18 +2004,43 @@ handle_new_sample (GstAppSink * sink, gpointer user_data) priv = stream->priv; buffer = gst_sample_get_buffer (sample); + is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0]; + g_mutex_lock (&priv->lock); - for (walk = priv->transports; walk; walk = g_list_next (walk)) { - GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; + if (is_rtp) { + if (priv->tr_cache_cookie_rtp != priv->transports_cookie) { + clear_tr_cache (priv, is_rtp); + for (walk = priv->transports; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; + priv->tr_cache_rtp = + g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr)); + } + priv->tr_cache_cookie_rtp = priv->transports_cookie; + } + } else { + if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) { + clear_tr_cache (priv, is_rtp); + for (walk = priv->transports; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; + priv->tr_cache_rtcp = + g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr)); + } + priv->tr_cache_cookie_rtcp = priv->transports_cookie; + } + } + g_mutex_unlock (&priv->lock); - if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) { + if (is_rtp) { + for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; gst_rtsp_stream_transport_send_rtp (tr, buffer); - } else { + } + } else { + for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; gst_rtsp_stream_transport_send_rtcp (tr, buffer); } } - g_mutex_unlock (&priv->lock); - gst_sample_unref (sample); return GST_FLOW_OK; @@ -1479,118 +2052,394 @@ static GstAppSinkCallbacks sink_cb = { handle_new_sample, }; +static GstElement * +get_rtp_encoder (GstRTSPStream * stream, guint session) +{ + GstRTSPStreamPrivate *priv = stream->priv; + + if (priv->srtpenc == NULL) { + gchar *name; + + name = g_strdup_printf ("srtpenc_%u", session); + priv->srtpenc = gst_element_factory_make ("srtpenc", name); + g_free (name); + + g_object_set (priv->srtpenc, "random-key", TRUE, NULL); + } + return gst_object_ref (priv->srtpenc); +} + +static GstElement * +request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GstElement *oldenc, *enc; + GstPad *pad; + gchar *name; + + if (priv->idx != session) + return NULL; + + GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session); + + oldenc = priv->srtpenc; + enc = get_rtp_encoder (stream, session); + name = g_strdup_printf ("rtp_sink_%d", session); + pad = gst_element_get_request_pad (enc, name); + g_free (name); + gst_object_unref (pad); + + if (oldenc == NULL) + g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0, + enc); + + return enc; +} + +static GstElement * +request_rtcp_encoder (GstElement * rtpbin, guint session, + GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GstElement *oldenc, *enc; + GstPad *pad; + gchar *name; + + if (priv->idx != session) + return NULL; + + GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session); + + oldenc = priv->srtpenc; + enc = get_rtp_encoder (stream, session); + name = g_strdup_printf ("rtcp_sink_%d", session); + pad = gst_element_get_request_pad (enc, name); + g_free (name); + gst_object_unref (pad); + + if (oldenc == NULL) + g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0, + enc); + + return enc; +} + +static GstCaps * +request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GstCaps *caps; + + GST_DEBUG ("request key %08x", ssrc); + + g_mutex_lock (&priv->lock); + if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc)))) + gst_caps_ref (caps); + g_mutex_unlock (&priv->lock); + + return caps; +} + +static GstElement * +request_rtp_rtcp_decoder (GstElement * rtpbin, guint session, + GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + + if (priv->idx != session) + return NULL; + + if (priv->srtpdec == NULL) { + gchar *name; + + name = g_strdup_printf ("srtpdec_%u", session); + priv->srtpdec = gst_element_factory_make ("srtpdec", name); + g_free (name); + + g_signal_connect (priv->srtpdec, "request-key", + (GCallback) request_key, stream); + } + return gst_object_ref (priv->srtpdec); +} + /** - * gst_rtsp_stream_join_bin: + * gst_rtsp_stream_request_aux_sender: * @stream: a #GstRTSPStream - * @bin: a #GstBin to join - * @rtpbin: a rtpbin element in @bin - * @state: the target state of the new elements + * @sessid: the session id * - * Join the #GstBin @bin that contains the element @rtpbin. + * Creating a rtxsend bin * - * @stream will link to @rtpbin, which must be inside @bin. The elements - * added to @bin will be set to the state given in @state. + * Returns: (transfer full): a #GstElement. * - * Returns: %TRUE on success. + * Since: 1.6 */ -gboolean -gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, - GstElement * rtpbin, GstState state) +GstElement * +gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid) { - GstRTSPStreamPrivate *priv; - gint i; - guint idx; + GstElement *bin; + GstPad *pad; + GstStructure *pt_map; gchar *name; - GstPad *pad, *sinkpad, *selpad; - GstPadLinkReturn ret; + guint pt, rtx_pt; + gchar *pt_s; - g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); - g_return_val_if_fail (GST_IS_BIN (bin), FALSE); - g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE); + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + + pt = gst_rtsp_stream_get_pt (stream); + pt_s = g_strdup_printf ("%u", pt); + rtx_pt = stream->priv->rtx_pt; + + GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt); + + bin = gst_bin_new (NULL); + stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL); + pt_map = gst_structure_new ("application/x-rtp-pt-map", + pt_s, G_TYPE_UINT, rtx_pt, NULL); + g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map, + "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL); + g_free (pt_s); + gst_structure_free (pt_map); + gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend)); + + pad = gst_element_get_static_pad (stream->priv->rtxsend, "src"); + name = g_strdup_printf ("src_%u", sessid); + gst_element_add_pad (bin, gst_ghost_pad_new (name, pad)); + g_free (name); + gst_object_unref (pad); + + pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink"); + name = g_strdup_printf ("sink_%u", sessid); + gst_element_add_pad (bin, gst_ghost_pad_new (name, pad)); + g_free (name); + gst_object_unref (pad); + + return bin; +} + +/** + * gst_rtsp_stream_set_pt_map: + * @stream: a #GstRTSPStream + * @pt: the pt + * @caps: a #GstCaps + * + * Configure a pt map between @pt and @caps. + */ +void +gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps) +{ + GstRTSPStreamPrivate *priv = stream->priv; + + g_mutex_lock (&priv->lock); + g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps)); + g_mutex_unlock (&priv->lock); +} + +/** + * gst_rtsp_stream_set_publish_clock_mode: + * @stream: a #GstRTSPStream + * @mode: the clock publish mode + * + * Sets if and how the stream clock should be published according to RFC7273. + * + * Since: 1.8 + */ +void +gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream, + GstRTSPPublishClockMode mode) +{ + GstRTSPStreamPrivate *priv; priv = stream->priv; + g_mutex_lock (&priv->lock); + priv->publish_clock_mode = mode; + g_mutex_unlock (&priv->lock); +} + +/** + * gst_rtsp_stream_get_publish_clock_mode: + * @factory: a #GstRTSPStream + * + * Gets if and how the stream clock should be published according to RFC7273. + * + * Returns: The GstRTSPPublishClockMode + * + * Since: 1.8 + */ +GstRTSPPublishClockMode +gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv; + GstRTSPPublishClockMode ret; + priv = stream->priv; g_mutex_lock (&priv->lock); - if (priv->is_joined) - goto was_joined; + ret = priv->publish_clock_mode; + g_mutex_unlock (&priv->lock); - /* create a session with the same index as the stream */ - idx = priv->idx; + return ret; +} - GST_INFO ("stream %p joining bin as session %u", stream, idx); +static GstCaps * +request_pt_map (GstElement * rtpbin, guint session, guint pt, + GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GstCaps *caps = NULL; - if (!alloc_ports (stream)) - goto no_ports; + g_mutex_lock (&priv->lock); - /* update the dscp qos field in the sinks */ - update_dscp_qos (stream); + if (priv->idx == session) { + caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt)); + if (caps) { + GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps); + gst_caps_ref (caps); + } else { + GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt); + } + } - /* get a pad for sending RTP */ - name = g_strdup_printf ("send_rtp_sink_%u", idx); - priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name); + g_mutex_unlock (&priv->lock); + + return caps; +} + +static void +pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + gchar *name; + GstPadLinkReturn ret; + guint sessid; + + GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream, + GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad)); + + name = gst_pad_get_name (pad); + if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) { + g_free (name); + return; + } g_free (name); + + if (priv->idx != sessid) + return; + + if (gst_pad_is_linked (priv->sinkpad)) { + GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream, + GST_DEBUG_PAD_NAME (priv->sinkpad)); + return; + } + /* link the RTP pad to the session manager, it should not really fail unless * this is not really an RTP pad */ - ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink); + ret = gst_pad_link (pad, priv->sinkpad); if (ret != GST_PAD_LINK_OK) goto link_failed; + priv->recv_rtp_src = gst_object_ref (pad); - /* get pads from the RTP session element for sending and receiving - * RTP/RTCP*/ - name = g_strdup_printf ("send_rtp_src_%u", idx); - priv->send_src[0] = gst_element_get_static_pad (rtpbin, name); - g_free (name); - name = g_strdup_printf ("send_rtcp_src_%u", idx); - priv->send_src[1] = gst_element_get_request_pad (rtpbin, name); - g_free (name); - name = g_strdup_printf ("recv_rtp_sink_%u", idx); - priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name); - g_free (name); - name = g_strdup_printf ("recv_rtcp_sink_%u", idx); - priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name); - g_free (name); + return; - /* get the session */ - g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session); +/* ERRORS */ +link_failed: + { + GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream, + GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad)); + } +} - g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc, - stream); - g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes, - stream); - g_signal_connect (priv->session, "on-ssrc-active", - (GCallback) on_ssrc_active, stream); - g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc, - stream); - g_signal_connect (priv->session, "on-bye-timeout", - (GCallback) on_bye_timeout, stream); - g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout, - stream); +static void +on_npt_stop (GstElement * rtpbin, guint session, guint ssrc, + GstRTSPStream * stream) +{ + /* TODO: What to do here other than this? */ + GST_DEBUG ("Stream %p: Got EOS", stream); + gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ()); +} + +static void +plug_sink (GstBin * bin, GstElement * tee, GstElement * sink, + GstElement ** queue_out) +{ + GstPad *pad; + GstPad *teepad; + GstPad *queuepad; + + gst_bin_add (bin, sink); + + *queue_out = gst_element_factory_make ("queue", NULL); + g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0, + "max-size-time", G_GINT64_CONSTANT (0), NULL); + gst_bin_add (bin, *queue_out); + + /* link tee to queue */ + teepad = gst_element_get_request_pad (tee, "src_%u"); + pad = gst_element_get_static_pad (*queue_out, "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + /* link queue to sink */ + queuepad = gst_element_get_static_pad (*queue_out, "src"); + pad = gst_element_get_static_pad (sink, "sink"); + gst_pad_link (queuepad, pad); + gst_object_unref (queuepad); + gst_object_unref (pad); +} + +/* must be called with lock */ +static void +create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) +{ + GstRTSPStreamPrivate *priv; + GstPad *pad; + gboolean is_tcp, is_udp; + gint i; + + priv = stream->priv; + + is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP; + is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || + (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)); for (i = 0; i < 2; i++) { - GstPad *teepad, *queuepad; /* For the sender we create this bit of pipeline for both * RTP and RTCP. Sync and preroll are enabled on udpsink so - * we need to add a queue before appsink to make the pipeline - * not block. For the TCP case, we want to pump data to the - * client as fast as possible anyway. + * we need to add a queue before appsink and udpsink to make + * the pipeline not block. For the TCP case, we want to pump + * client as fast as possible anyway. This pipeline is used + * when both TCP and UDP are present. * - * .--------. .-----. .---------. - * | rtpbin | | tee | | udpsink | - * | send->sink src->sink | - * '--------' | | '---------' + * .--------. .-----. .---------. .---------. + * | rtpbin | | tee | | queue | | udpsink | + * | send->sink src->sink src->sink | + * '--------' | | '---------' '---------' * | | .---------. .---------. * | | | queue | | appsink | * | src->sink src->sink | * '-----' '---------' '---------' * - * When only UDP is allowed, we skip the tee, queue and appsink and link the - * udpsink directly to the session. + * When only UDP or only TCP is allowed, we skip the tee and queue + * and link the udpsink (for UDP) or appsink (for TCP) directly to + * the session. */ - /* add udpsink */ - gst_bin_add (bin, priv->udpsink[i]); - sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink"); - if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) { + /* Only link the RTP send src if we're going to send RTP, link + * the RTCP send src always */ + if (!priv->srcpad && i == 0) + continue; + + if (is_tcp) { + /* make appsink */ + priv->appsink[i] = gst_element_factory_make ("appsink", NULL); + g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL); + gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]), + &sink_cb, stream, NULL); + } + + /* If we have udp always use a tee because we could have mcast clients + * requesting different ports, in which case we'll have to plug more + * udpsinks. */ + if (is_udp) { /* make tee for RTP/RTCP */ priv->tee[i] = gst_element_factory_make ("tee", NULL); gst_bin_add (bin, priv->tee[i]); @@ -1600,40 +2449,92 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, gst_pad_link (priv->send_src[i], pad); gst_object_unref (pad); - /* link tee to udpsink */ - teepad = gst_element_get_request_pad (priv->tee[i], "src_%u"); - gst_pad_link (teepad, sinkpad); - gst_object_unref (teepad); - - /* make queue */ - priv->appqueue[i] = gst_element_factory_make ("queue", NULL); - gst_bin_add (bin, priv->appqueue[i]); - /* and link to tee */ - teepad = gst_element_get_request_pad (priv->tee[i], "src_%u"); - pad = gst_element_get_static_pad (priv->appqueue[i], "sink"); - gst_pad_link (teepad, pad); - gst_object_unref (pad); - gst_object_unref (teepad); + if (priv->udpsink[i]) + plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]); - /* make appsink */ - priv->appsink[i] = gst_element_factory_make ("appsink", NULL); - g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); - g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL); - gst_bin_add (bin, priv->appsink[i]); - gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]), - &sink_cb, stream, NULL); - /* and link to queue */ - queuepad = gst_element_get_static_pad (priv->appqueue[i], "src"); + if (priv->mcast_udpsink[i]) + plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i], + &priv->mcast_udpqueue[i]); + + if (is_tcp) { + g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); + plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]); + } + } else if (is_tcp) { + /* only appsink needed, link it to the session */ pad = gst_element_get_static_pad (priv->appsink[i], "sink"); - gst_pad_link (queuepad, pad); + gst_pad_link (priv->send_src[i], pad); gst_object_unref (pad); - gst_object_unref (queuepad); - } else { - /* else only udpsink needed, link it to the session */ - gst_pad_link (priv->send_src[i], sinkpad); + + /* when its only TCP, we need to set sync and preroll to FALSE + * for the sink to avoid deadlock. And this is only needed for + * sink used for RTCP data, not the RTP data. */ + if (i == 1) + g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); + } + + /* check if we need to set to a special state */ + if (state != GST_STATE_NULL) { + if (priv->udpsink[i]) + gst_element_set_state (priv->udpsink[i], state); + if (priv->mcast_udpsink[i]) + gst_element_set_state (priv->mcast_udpsink[i], state); + if (priv->appsink[i]) + gst_element_set_state (priv->appsink[i], state); + if (priv->appqueue[i]) + gst_element_set_state (priv->appqueue[i], state); + if (priv->udpqueue[i]) + gst_element_set_state (priv->udpqueue[i], state); + if (priv->mcast_udpqueue[i]) + gst_element_set_state (priv->mcast_udpqueue[i], state); + if (priv->tee[i]) + gst_element_set_state (priv->tee[i], state); } - gst_object_unref (sinkpad); + } +} + +/* must be called with lock */ +static void +plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src, + GstElement * funnel) +{ + GstRTSPStreamPrivate *priv; + GstPad *pad, *selpad; + + priv = stream->priv; + + if (priv->srcpad) { + /* we set and keep these to playing so that they don't cause NO_PREROLL return + * values. This is only relevant for PLAY pipelines */ + gst_element_set_state (src, GST_STATE_PLAYING); + gst_element_set_locked_state (src, TRUE); + } + + /* add src */ + gst_bin_add (bin, src); + + /* and link to the funnel */ + selpad = gst_element_get_request_pad (funnel, "sink_%u"); + pad = gst_element_get_static_pad (src, "src"); + gst_pad_link (pad, selpad); + gst_object_unref (pad); + gst_object_unref (selpad); +} + +/* must be called with lock */ +static void +create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) +{ + GstRTSPStreamPrivate *priv; + GstPad *pad; + gboolean is_tcp; + gint i; + priv = stream->priv; + + is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP; + + for (i = 0; i < 2; i++) { /* For the receiver we create this bit of pipeline for both * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc * and it is all funneled into the rtpbin receive pad. @@ -1647,6 +2548,13 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, * | src->sink | * '--------' '--------' */ + + if (!priv->sinkpad && i == 0) { + /* Only connect recv RTP sink if we expect to receive RTP. Connect recv + * RTCP sink always */ + continue; + } + /* make funnel for the RTP/RTCP receivers */ priv->funnel[i] = gst_element_factory_make ("funnel", NULL); gst_bin_add (bin, priv->funnel[i]); @@ -1655,69 +2563,208 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, gst_pad_link (pad, priv->recv_sink[i]); gst_object_unref (pad); - if (priv->udpsrc_v4[i]) { - /* we set and keep these to playing so that they don't cause NO_PREROLL return - * values */ - gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING); - gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE); - /* add udpsrc */ - gst_bin_add (bin, priv->udpsrc_v4[i]); - - /* and link to the funnel v4 */ - selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); - pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - } + if (priv->udpsrc_v4[i]) + plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]); + + if (priv->udpsrc_v6[i]) + plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]); + + if (priv->mcast_udpsrc_v4[i]) + plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]); + + if (priv->mcast_udpsrc_v6[i]) + plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]); + + if (is_tcp) { + /* make and add appsrc */ + priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); + priv->appsrc_base_time[i] = -1; + g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live", + TRUE, NULL); + plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]); + } + + /* check if we need to set to a special state */ + if (state != GST_STATE_NULL) { + gst_element_set_state (priv->funnel[i], state); + } + } +} + +static gboolean +check_mcast_part_for_transport (GstRTSPStream * stream, + const GstRTSPTransport * tr) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GInetAddress *inetaddr; + GSocketFamily family; + GstRTSPAddress *mcast_addr; + + /* Check if it's a ipv4 or ipv6 transport */ + inetaddr = g_inet_address_new_from_string (tr->destination); + family = g_inet_address_get_family (inetaddr); + g_object_unref (inetaddr); + + /* Select fields corresponding to the family */ + if (family == G_SOCKET_FAMILY_IPV4) { + mcast_addr = priv->mcast_addr_v4; + } else { + mcast_addr = priv->mcast_addr_v6; + } + + /* We support only one mcast group per family, make sure this transport + * matches it. */ + if (!mcast_addr) + goto no_addr; + + if (!g_str_equal (tr->destination, mcast_addr->address) || + tr->port.min != mcast_addr->port || + tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 || + tr->ttl != mcast_addr->ttl) + goto wrong_addr; + + return TRUE; + +no_addr: + { + GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address " + "has been reserved"); + return FALSE; + } +wrong_addr: + { + GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match " + "the reserved address"); + return FALSE; + } +} + +/** + * gst_rtsp_stream_join_bin: + * @stream: a #GstRTSPStream + * @bin: (transfer none): a #GstBin to join + * @rtpbin: (transfer none): a rtpbin element in @bin + * @state: the target state of the new elements + * + * Join the #GstBin @bin that contains the element @rtpbin. + * + * @stream will link to @rtpbin, which must be inside @bin. The elements + * added to @bin will be set to the state given in @state. + * + * Returns: %TRUE on success. + */ +gboolean +gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, + GstElement * rtpbin, GstState state) +{ + GstRTSPStreamPrivate *priv; + guint idx; + gchar *name; + GstPadLinkReturn ret; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + g_return_val_if_fail (GST_IS_BIN (bin), FALSE); + g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE); + + priv = stream->priv; + + g_mutex_lock (&priv->lock); + if (priv->joined_bin != NULL) + goto was_joined; + + /* create a session with the same index as the stream */ + idx = priv->idx; + + GST_INFO ("stream %p joining bin as session %u", stream, idx); - if (priv->udpsrc_v6[i]) { - gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING); - gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE); - gst_bin_add (bin, priv->udpsrc_v6[i]); + if (!alloc_ports (stream)) + goto no_ports; - /* and link to the funnel v6 */ - selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); - pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - } + if (priv->profiles & GST_RTSP_PROFILE_SAVP + || priv->profiles & GST_RTSP_PROFILE_SAVPF) { + /* For SRTP */ + g_signal_connect (rtpbin, "request-rtp-encoder", + (GCallback) request_rtp_encoder, stream); + g_signal_connect (rtpbin, "request-rtcp-encoder", + (GCallback) request_rtcp_encoder, stream); + g_signal_connect (rtpbin, "request-rtp-decoder", + (GCallback) request_rtp_rtcp_decoder, stream); + g_signal_connect (rtpbin, "request-rtcp-decoder", + (GCallback) request_rtp_rtcp_decoder, stream); + } - if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) { - /* make and add appsrc */ - priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); - gst_bin_add (bin, priv->appsrc[i]); - /* and link to the funnel */ - selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); - pad = gst_element_get_static_pad (priv->appsrc[i], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - } + if (priv->sinkpad) { + g_signal_connect (rtpbin, "request-pt-map", + (GCallback) request_pt_map, stream); + } - /* check if we need to set to a special state */ - if (state != GST_STATE_NULL) { - if (priv->udpsink[i]) - gst_element_set_state (priv->udpsink[i], state); - if (priv->appsink[i]) - gst_element_set_state (priv->appsink[i], state); - if (priv->appqueue[i]) - gst_element_set_state (priv->appqueue[i], state); - if (priv->tee[i]) - gst_element_set_state (priv->tee[i], state); - if (priv->funnel[i]) - gst_element_set_state (priv->funnel[i], state); - if (priv->appsrc[i]) - gst_element_set_state (priv->appsrc[i], state); - } + /* get pads from the RTP session element for sending and receiving + * RTP/RTCP*/ + if (priv->srcpad) { + /* get a pad for sending RTP */ + name = g_strdup_printf ("send_rtp_sink_%u", idx); + priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name); + g_free (name); + + /* link the RTP pad to the session manager, it should not really fail unless + * this is not really an RTP pad */ + ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink); + if (ret != GST_PAD_LINK_OK) + goto link_failed; + + name = g_strdup_printf ("send_rtp_src_%u", idx); + priv->send_src[0] = gst_element_get_static_pad (rtpbin, name); + g_free (name); + } else { + /* Need to connect our sinkpad from here */ + g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream); + /* EOS */ + g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream); + + name = g_strdup_printf ("recv_rtp_sink_%u", idx); + priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name); + g_free (name); } - /* be notified of caps changes */ - priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps", - (GCallback) caps_notify, stream); + name = g_strdup_printf ("send_rtcp_src_%u", idx); + priv->send_src[1] = gst_element_get_request_pad (rtpbin, name); + g_free (name); + name = g_strdup_printf ("recv_rtcp_sink_%u", idx); + priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name); + g_free (name); + + /* get the session */ + g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session); + + g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc, + stream); + g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes, + stream); + g_signal_connect (priv->session, "on-ssrc-active", + (GCallback) on_ssrc_active, stream); + g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc, + stream); + g_signal_connect (priv->session, "on-bye-timeout", + (GCallback) on_bye_timeout, stream); + g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout, + stream); + + /* signal for sender ssrc */ + g_signal_connect (priv->session, "on-new-sender-ssrc", + (GCallback) on_new_sender_ssrc, stream); + g_signal_connect (priv->session, "on-sender-ssrc-active", + (GCallback) on_sender_ssrc_active, stream); + + create_sender_part (stream, bin, state); + create_receiver_part (stream, bin, state); - priv->is_joined = TRUE; + if (priv->srcpad) { + /* be notified of caps changes */ + priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps", + (GCallback) caps_notify, stream); + } + + priv->joined_bin = gst_object_ref (bin); g_mutex_unlock (&priv->lock); return TRUE; @@ -1744,11 +2791,25 @@ link_failed: } } +static void +clear_element (GstBin * bin, GstElement ** elementptr) +{ + if (*elementptr) { + gst_element_set_locked_state (*elementptr, FALSE); + gst_element_set_state (*elementptr, GST_STATE_NULL); + if (GST_ELEMENT_PARENT (*elementptr)) + gst_bin_remove (bin, *elementptr); + else + gst_object_unref (*elementptr); + *elementptr = NULL; + } +} + /** * gst_rtsp_stream_leave_bin: * @stream: a #GstRTSPStream - * @bin: a #GstBin - * @rtpbin: a rtpbin #GstElement + * @bin: (transfer none): a #GstBin + * @rtpbin: (transfer none): a rtpbin #GstElement * * Remove the elements of @stream from @bin. * @@ -1768,74 +2829,64 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, priv = stream->priv; g_mutex_lock (&priv->lock); - if (!priv->is_joined) + if (priv->joined_bin == NULL) goto was_not_joined; + if (priv->joined_bin != bin) + goto wrong_bin; + + priv->joined_bin = NULL; /* all transports must be removed by now */ - g_return_val_if_fail (priv->transports == NULL, FALSE); + if (priv->transports != NULL) + goto transports_not_removed; + + clear_tr_cache (priv, TRUE); + clear_tr_cache (priv, FALSE); GST_INFO ("stream %p leaving bin", stream); - gst_pad_unlink (priv->srcpad, priv->send_rtp_sink); - g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig); - gst_element_release_request_pad (rtpbin, priv->send_rtp_sink); - gst_object_unref (priv->send_rtp_sink); - priv->send_rtp_sink = NULL; + if (priv->srcpad) { + gst_pad_unlink (priv->srcpad, priv->send_rtp_sink); + + g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig); + gst_element_release_request_pad (rtpbin, priv->send_rtp_sink); + gst_object_unref (priv->send_rtp_sink); + priv->send_rtp_sink = NULL; + } else if (priv->recv_rtp_src) { + gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad); + gst_object_unref (priv->recv_rtp_src); + priv->recv_rtp_src = NULL; + } for (i = 0; i < 2; i++) { - if (priv->udpsink[i]) - gst_element_set_state (priv->udpsink[i], GST_STATE_NULL); - if (priv->appsink[i]) - gst_element_set_state (priv->appsink[i], GST_STATE_NULL); - if (priv->appqueue[i]) - gst_element_set_state (priv->appqueue[i], GST_STATE_NULL); - if (priv->tee[i]) - gst_element_set_state (priv->tee[i], GST_STATE_NULL); - if (priv->funnel[i]) - gst_element_set_state (priv->funnel[i], GST_STATE_NULL); - if (priv->appsrc[i]) - gst_element_set_state (priv->appsrc[i], GST_STATE_NULL); - if (priv->udpsrc_v4[i]) { - /* and set udpsrc to NULL now before removing */ - gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE); - gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL); - /* removing them should also nicely release the request - * pads when they finalize */ - gst_bin_remove (bin, priv->udpsrc_v4[i]); + clear_element (bin, &priv->udpsrc_v4[i]); + clear_element (bin, &priv->udpsrc_v6[i]); + clear_element (bin, &priv->udpqueue[i]); + clear_element (bin, &priv->udpsink[i]); + + clear_element (bin, &priv->mcast_udpsrc_v4[i]); + clear_element (bin, &priv->mcast_udpsrc_v6[i]); + clear_element (bin, &priv->mcast_udpqueue[i]); + clear_element (bin, &priv->mcast_udpsink[i]); + + clear_element (bin, &priv->appsrc[i]); + clear_element (bin, &priv->appqueue[i]); + clear_element (bin, &priv->appsink[i]); + + clear_element (bin, &priv->tee[i]); + clear_element (bin, &priv->funnel[i]); + + if (priv->sinkpad || i == 1) { + gst_element_release_request_pad (rtpbin, priv->recv_sink[i]); + gst_object_unref (priv->recv_sink[i]); + priv->recv_sink[i] = NULL; } - if (priv->udpsrc_v6[i]) { - gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE); - gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL); - gst_bin_remove (bin, priv->udpsrc_v6[i]); - } - if (priv->udpsink[i]) - gst_bin_remove (bin, priv->udpsink[i]); - if (priv->appsrc[i]) - gst_bin_remove (bin, priv->appsrc[i]); - if (priv->appsink[i]) - gst_bin_remove (bin, priv->appsink[i]); - if (priv->appqueue[i]) - gst_bin_remove (bin, priv->appqueue[i]); - if (priv->tee[i]) - gst_bin_remove (bin, priv->tee[i]); - if (priv->funnel[i]) - gst_bin_remove (bin, priv->funnel[i]); - - gst_element_release_request_pad (rtpbin, priv->recv_sink[i]); - gst_object_unref (priv->recv_sink[i]); - priv->recv_sink[i] = NULL; - - priv->udpsrc_v4[i] = NULL; - priv->udpsrc_v6[i] = NULL; - priv->udpsink[i] = NULL; - priv->appsrc[i] = NULL; - priv->appsink[i] = NULL; - priv->appqueue[i] = NULL; - priv->tee[i] = NULL; - priv->funnel[i] = NULL; - } - gst_object_unref (priv->send_src[0]); - priv->send_src[0] = NULL; + } + + if (priv->srcpad) { + gst_object_unref (priv->send_src[0]); + priv->send_src[0] = NULL; + } gst_element_release_request_pad (rtpbin, priv->send_src[1]); gst_object_unref (priv->send_src[1]); @@ -1847,15 +2898,71 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, gst_caps_unref (priv->caps); priv->caps = NULL; - priv->is_joined = FALSE; + if (priv->srtpenc) + gst_object_unref (priv->srtpenc); + if (priv->srtpdec) + gst_object_unref (priv->srtpdec); + + if (priv->mcast_addr_v4) + gst_rtsp_address_free (priv->mcast_addr_v4); + priv->mcast_addr_v4 = NULL; + if (priv->mcast_addr_v6) + gst_rtsp_address_free (priv->mcast_addr_v6); + priv->mcast_addr_v6 = NULL; + if (priv->server_addr_v4) + gst_rtsp_address_free (priv->server_addr_v4); + priv->server_addr_v4 = NULL; + if (priv->server_addr_v6) + gst_rtsp_address_free (priv->server_addr_v6); + priv->server_addr_v6 = NULL; + + g_clear_object (&priv->joined_bin); g_mutex_unlock (&priv->lock); return TRUE; was_not_joined: { + g_mutex_unlock (&priv->lock); return TRUE; } +transports_not_removed: + { + GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)"); + g_mutex_unlock (&priv->lock); + return FALSE; + } +wrong_bin: + { + GST_ERROR_OBJECT (stream, "leaving the wrong bin"); + g_mutex_unlock (&priv->lock); + return FALSE; + } +} + +/** + * gst_rtsp_stream_get_joined_bin: + * @stream: a #GstRTSPStream + * + * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL. + * + * Return: (transfer full): the joined bin or NULL. + */ +GstBin * +gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv; + GstBin *bin = NULL; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + + g_mutex_lock (&priv->lock); + bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL; + g_mutex_unlock (&priv->lock); + + return bin; } /** @@ -1863,7 +2970,7 @@ was_not_joined: * @stream: a #GstRTSPStream * @rtptime: (allow-none): result RTP timestamp * @seq: (allow-none): result RTP seqnum - * @clock_rate: the clock rate + * @clock_rate: (allow-none): the clock rate * @running_time: (allow-none): result running-time * * Retrieve the current rtptime, seq and running-time. This is used to @@ -1888,6 +2995,62 @@ gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream, g_mutex_lock (&priv->lock); + /* First try to extract the information from the last buffer on the sinks. + * This will have a more accurate sequence number and timestamp, as between + * the payloader and the sink there can be some queues + */ + if (priv->udpsink[0] || priv->appsink[0]) { + GstSample *last_sample; + + if (priv->udpsink[0]) + g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL); + else + g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL); + + if (last_sample) { + GstCaps *caps; + GstBuffer *buffer; + GstSegment *segment; + GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT; + + caps = gst_sample_get_caps (last_sample); + buffer = gst_sample_get_buffer (last_sample); + segment = gst_sample_get_segment (last_sample); + + if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) { + if (seq) { + *seq = gst_rtp_buffer_get_seq (&rtp_buffer); + } + + if (rtptime) { + *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer); + } + + gst_rtp_buffer_unmap (&rtp_buffer); + + if (running_time) { + *running_time = + gst_segment_to_running_time (segment, GST_FORMAT_TIME, + GST_BUFFER_TIMESTAMP (buffer)); + } + + if (clock_rate) { + GstStructure *s = gst_caps_get_structure (caps, 0); + + gst_structure_get_int (s, "clock-rate", (gint *) clock_rate); + + if (*clock_rate == 0 && running_time) + *running_time = GST_CLOCK_TIME_NONE; + } + gst_sample_unref (last_sample); + + goto done; + } else { + gst_sample_unref (last_sample); + } + } + } + if (g_object_class_find_property (payobjclass, "stats")) { g_object_get (priv->payloader, "stats", &stats, NULL); if (stats == NULL) @@ -1922,6 +3085,8 @@ gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream, if (running_time) *running_time = GST_CLOCK_TIME_NONE; } + +done: g_mutex_unlock (&priv->lock); return TRUE; @@ -1942,7 +3107,7 @@ no_stats: * Retrieve the current caps of @stream. * * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref() - * after usage. + * after usage. */ GstCaps * gst_rtsp_stream_get_caps (GstRTSPStream * stream) @@ -1984,7 +3149,7 @@ gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer) g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR); priv = stream->priv; g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); - g_return_val_if_fail (priv->is_joined, FALSE); + g_return_val_if_fail (priv->joined_bin != NULL, FALSE); g_mutex_lock (&priv->lock); if (priv->appsrc[0]) @@ -1994,6 +3159,31 @@ gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer) g_mutex_unlock (&priv->lock); if (element) { + if (priv->appsrc_base_time[0] == -1) { + /* Take current running_time. This timestamp will be put on + * the first buffer of each stream because we are a live source and so we + * timestamp with the running_time. When we are dealing with TCP, we also + * only timestamp the first buffer (using the DISCONT flag) because a server + * typically bursts data, for which we don't want to compensate by speeding + * up the media. The other timestamps will be interpollated from this one + * using the RTP timestamps. */ + GST_OBJECT_LOCK (element); + if (GST_ELEMENT_CLOCK (element)) { + GstClockTime now; + GstClockTime base_time; + + now = gst_clock_get_time (GST_ELEMENT_CLOCK (element)); + base_time = GST_ELEMENT_CAST (element)->base_time; + + priv->appsrc_base_time[0] = now - base_time; + GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0]; + GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT + ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now), + GST_TIME_ARGS (base_time)); + } + GST_OBJECT_UNLOCK (element); + } + ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer); gst_object_unref (element); } else { @@ -2024,8 +3214,11 @@ gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer) g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR); priv = stream->priv; g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); - g_return_val_if_fail (priv->is_joined, FALSE); + if (priv->joined_bin == NULL) { + gst_buffer_unref (buffer); + return GST_FLOW_NOT_LINKED; + } g_mutex_lock (&priv->lock); if (priv->appsrc[1]) element = gst_object_ref (priv->appsrc[1]); @@ -2034,10 +3227,36 @@ gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer) g_mutex_unlock (&priv->lock); if (element) { + if (priv->appsrc_base_time[1] == -1) { + /* Take current running_time. This timestamp will be put on + * the first buffer of each stream because we are a live source and so we + * timestamp with the running_time. When we are dealing with TCP, we also + * only timestamp the first buffer (using the DISCONT flag) because a server + * typically bursts data, for which we don't want to compensate by speeding + * up the media. The other timestamps will be interpollated from this one + * using the RTP timestamps. */ + GST_OBJECT_LOCK (element); + if (GST_ELEMENT_CLOCK (element)) { + GstClockTime now; + GstClockTime base_time; + + now = gst_clock_get_time (GST_ELEMENT_CLOCK (element)); + base_time = GST_ELEMENT_CAST (element)->base_time; + + priv->appsrc_base_time[1] = now - base_time; + GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1]; + GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT + ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now), + GST_TIME_ARGS (base_time)); + } + GST_OBJECT_UNLOCK (element); + } + ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer); gst_object_unref (element); } else { ret = GST_FLOW_OK; + gst_buffer_unref (buffer); } return ret; } @@ -2053,9 +3272,19 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, tr = gst_rtsp_stream_transport_get_transport (trans); switch (tr->lower_transport) { - case GST_RTSP_LOWER_TRANS_UDP: case GST_RTSP_LOWER_TRANS_UDP_MCAST: { + if (add) { + if (!check_mcast_part_for_transport (stream, tr)) + goto mcast_error; + priv->transports = g_list_prepend (priv->transports, trans); + } else { + priv->transports = g_list_remove (priv->transports, trans); + } + break; + } + case GST_RTSP_LOWER_TRANS_UDP: + { gchar *dest; gint min, max; guint ttl = 0; @@ -2065,20 +3294,25 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, min = tr->port.min; max = tr->port.max; ttl = tr->ttl; + } else if (priv->client_side) { + /* In client side mode the 'destination' is the RTSP server, so send + * to those ports */ + min = tr->server_port.min; + max = tr->server_port.max; } else { min = tr->client_port.min; max = tr->client_port.max; } if (add) { - GST_INFO ("adding %s:%d-%d", dest, min, max); - g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL); - g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL); if (ttl > 0) { GST_INFO ("setting ttl-mc %d", ttl); g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL); g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL); } + GST_INFO ("adding %s:%d-%d", dest, min, max); + g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL); + g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL); priv->transports = g_list_prepend (priv->transports, trans); } else { GST_INFO ("removing %s:%d-%d", dest, min, max); @@ -2086,6 +3320,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL); priv->transports = g_list_remove (priv->transports, trans); } + priv->transports_cookie++; break; } case GST_RTSP_LOWER_TRANS_TCP: @@ -2096,6 +3331,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, GST_INFO ("removing TCP %s", tr->destination); priv->transports = g_list_remove (priv->transports, trans); } + priv->transports_cookie++; break; default: goto unknown_transport; @@ -2108,13 +3344,17 @@ unknown_transport: GST_INFO ("Unknown transport %d", tr->lower_transport); return FALSE; } +mcast_error: + { + return FALSE; + } } /** * gst_rtsp_stream_add_transport: * @stream: a #GstRTSPStream - * @trans: a #GstRTSPStreamTransport + * @trans: (transfer none): a #GstRTSPStreamTransport * * Add the transport in @trans to @stream. The media of @stream will * then also be send to the values configured in @trans. @@ -2135,7 +3375,7 @@ gst_rtsp_stream_add_transport (GstRTSPStream * stream, g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); - g_return_val_if_fail (priv->is_joined, FALSE); + g_return_val_if_fail (priv->joined_bin != NULL, FALSE); g_mutex_lock (&priv->lock); res = update_transport (stream, trans, TRUE); @@ -2147,7 +3387,7 @@ gst_rtsp_stream_add_transport (GstRTSPStream * stream, /** * gst_rtsp_stream_remove_transport: * @stream: a #GstRTSPStream - * @trans: a #GstRTSPStreamTransport + * @trans: (transfer none): a #GstRTSPStreamTransport * * Remove the transport in @trans from @stream. The media of @stream will * not be sent to the values configured in @trans. @@ -2168,7 +3408,7 @@ gst_rtsp_stream_remove_transport (GstRTSPStream * stream, g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); - g_return_val_if_fail (priv->is_joined, FALSE); + g_return_val_if_fail (priv->joined_bin != NULL, FALSE); g_mutex_lock (&priv->lock); res = update_transport (stream, trans, FALSE); @@ -2178,6 +3418,43 @@ gst_rtsp_stream_remove_transport (GstRTSPStream * stream, } /** + * gst_rtsp_stream_update_crypto: + * @stream: a #GstRTSPStream + * @ssrc: the SSRC + * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info + * + * Update the new crypto information for @ssrc in @stream. If information + * for @ssrc did not exist, it will be added. If information + * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will + * be removed from @stream. + * + * Returns: %TRUE if @crypto could be updated + */ +gboolean +gst_rtsp_stream_update_crypto (GstRTSPStream * stream, + guint ssrc, GstCaps * crypto) +{ + GstRTSPStreamPrivate *priv; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE); + + priv = stream->priv; + + GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc); + + g_mutex_lock (&priv->lock); + if (crypto) + g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc), + gst_caps_ref (crypto)); + else + g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc)); + g_mutex_unlock (&priv->lock); + + return TRUE; +} + +/** * gst_rtsp_stream_get_rtp_socket: * @stream: a #GstRTSPStream * @family: the socket family @@ -2186,8 +3463,8 @@ gst_rtsp_stream_remove_transport (GstRTSPStream * stream, * * @stream must be joined to a bin. * - * Returns: the RTP socket or %NULL if no socket could be allocated for @family. - * Unref after usage + * Returns: (transfer full) (nullable): the RTP socket or %NULL if no + * socket could be allocated for @family. Unref after usage */ GSocket * gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family) @@ -2220,8 +3497,8 @@ gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family) * * @stream must be joined to a bin. * - * Returns: the RTCP socket or %NULL if no socket could be allocated for - * @family. Unref after usage + * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no + * socket could be allocated for @family. Unref after usage */ GSocket * gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family) @@ -2246,10 +3523,52 @@ gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family) } /** + * gst_rtsp_stream_set_seqnum: + * @stream: a #GstRTSPStream + * @seqnum: a new sequence number + * + * Configure the sequence number in the payloader of @stream to @seqnum. + */ +void +gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum) +{ + GstRTSPStreamPrivate *priv; + + g_return_if_fail (GST_IS_RTSP_STREAM (stream)); + + priv = stream->priv; + + g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL); +} + +/** + * gst_rtsp_stream_get_seqnum: + * @stream: a #GstRTSPStream + * + * Get the configured sequence number in the payloader of @stream. + * + * Returns: the sequence number of the payloader. + */ +guint16 +gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv; + guint seqnum; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); + + priv = stream->priv; + + g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL); + + return seqnum; +} + +/** * gst_rtsp_stream_transport_filter: * @stream: a #GstRTSPStream * @func: (scope call) (allow-none): a callback - * @user_data: user data passed to @func + * @user_data: (closure): user data passed to @func * * Call @func for each transport managed by @stream. The result value of @func * determines what happens to the transport. @func will be called with @stream @@ -2276,25 +3595,43 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream, { GstRTSPStreamPrivate *priv; GList *result, *walk, *next; + GHashTable *visited = NULL; + guint cookie; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; result = NULL; + if (func) + visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); g_mutex_lock (&priv->lock); +restart: + cookie = priv->transports_cookie; for (walk = priv->transports; walk; walk = next) { GstRTSPStreamTransport *trans = walk->data; GstRTSPFilterResult res; + gboolean changed; next = g_list_next (walk); - if (func) + if (func) { + /* only visit each transport once */ + if (g_hash_table_contains (visited, trans)) + continue; + + g_hash_table_add (visited, g_object_ref (trans)); + g_mutex_unlock (&priv->lock); + res = func (stream, trans, user_data); - else + + g_mutex_lock (&priv->lock); + } else res = GST_RTSP_FILTER_REF; + changed = (cookie != priv->transports_cookie); + switch (res) { case GST_RTSP_FILTER_REMOVE: update_transport (stream, trans, FALSE); @@ -2306,9 +3643,14 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream, default: break; } + if (changed) + goto restart; } g_mutex_unlock (&priv->lock); + if (func) + g_hash_table_unref (visited); + return result; } @@ -2397,3 +3739,95 @@ gst_rtsp_stream_is_blocking (GstRTSPStream * stream) return result; } + +/** + * gst_rtsp_stream_query_position: + * @stream: a #GstRTSPStream + * + * Query the position of the stream in %GST_FORMAT_TIME. This only considers + * the RTP parts of the pipeline and not the RTCP parts. + * + * Returns: %TRUE if the position could be queried + */ +gboolean +gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position) +{ + GstRTSPStreamPrivate *priv; + GstElement *sink; + gboolean ret; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + + g_mutex_lock (&priv->lock); + /* depending on the transport type, it should query corresponding sink */ + if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || + (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) + sink = priv->udpsink[0]; + else + sink = priv->appsink[0]; + + if (sink) + gst_object_ref (sink); + g_mutex_unlock (&priv->lock); + + if (!sink) + return FALSE; + + ret = gst_element_query_position (sink, GST_FORMAT_TIME, position); + gst_object_unref (sink); + + return ret; +} + +/** + * gst_rtsp_stream_query_stop: + * @stream: a #GstRTSPStream + * + * Query the stop of the stream in %GST_FORMAT_TIME. This only considers + * the RTP parts of the pipeline and not the RTCP parts. + * + * Returns: %TRUE if the stop could be queried + */ +gboolean +gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop) +{ + GstRTSPStreamPrivate *priv; + GstElement *sink; + GstQuery *query; + gboolean ret; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + + g_mutex_lock (&priv->lock); + /* depending on the transport type, it should query corresponding sink */ + if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || + (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) + sink = priv->udpsink[0]; + else + sink = priv->appsink[0]; + + if (sink) + gst_object_ref (sink); + g_mutex_unlock (&priv->lock); + + if (!sink) + return FALSE; + + query = gst_query_new_segment (GST_FORMAT_TIME); + if ((ret = gst_element_query (sink, query))) { + GstFormat format; + + gst_query_parse_segment (query, NULL, &format, NULL, stop); + if (format != GST_FORMAT_TIME) + *stop = -1; + } + gst_query_unref (query); + gst_object_unref (sink); + + return ret; + +}