X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtsp-server%2Frtsp-stream.c;h=4417c4321afd0ae5f1f1ab505d32e6cc5c174128;hb=da8a31ac88e30b4c6126cf690d2faf943c519ce5;hp=7317553b37043396e1cb9580a071292e1cdbbaf6;hpb=17322693f66a7e2b7b37f79a12618d6eea6e12cf;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 7317553..4417c43 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -1,5 +1,7 @@ /* GStreamer * Copyright (C) 2008 Wim Taymans + * Copyright (C) 2015 Centricular Ltd + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -53,16 +55,28 @@ #include #include +#include + #include "rtsp-stream.h" #define GST_RTSP_STREAM_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate)) +typedef struct +{ + GstRTSPStreamTransport *transport; + + /* RTP and RTCP source */ + GstElement *udpsrc[2]; + GstPad *selpad[2]; +} GstRTSPMulticastTransportSource; + struct _GstRTSPStreamPrivate { GMutex lock; guint idx; - GstPad *srcpad; + /* Only one pad is ever set */ + GstPad *srcpad, *sinkpad; GstElement *payloader; guint buffer_size; gboolean is_joined; @@ -73,6 +87,7 @@ struct _GstRTSPStreamPrivate /* pads on the rtpbin */ GstPad *send_rtp_sink; + GstPad *recv_rtp_src; GstPad *recv_sink[2]; GstPad *send_src[2]; @@ -92,16 +107,23 @@ struct _GstRTSPStreamPrivate * sockets */ GstElement *udpsrc_v6[2]; + GstElement *udpqueue[2]; GstElement *udpsink[2]; /* for TCP transport */ GstElement *appsrc[2]; + GstClockTime appsrc_base_time[2]; GstElement *appqueue[2]; GstElement *appsink[2]; GstElement *tee[2]; GstElement *funnel[2]; + /* retransmission */ + GstElement *rtxsend; + guint rtx_pt; + GstClockTime rtx_time; + /* server ports for sending/receiving over ipv4 */ GstRTSPRange server_port_v4; GstRTSPAddress *server_addr_v4; @@ -124,14 +146,24 @@ struct _GstRTSPStreamPrivate /* transports we stream to */ guint n_active; GList *transports; - gboolean tr_changed; - GList *tr_cache; + guint transports_cookie; + GList *tr_cache_rtp; + GList *tr_cache_rtcp; + guint tr_cache_cookie_rtp; + guint tr_cache_cookie_rtcp; + + + /* UDP sources for UDP multicast transports */ + GList *transport_sources; gint dscp_qos; /* stream blocking */ gulong blocked_id; gboolean blocking; + + /* pt->caps map for RECORD streams */ + GHashTable *ptmap; }; #define DEFAULT_CONTROL NULL @@ -232,6 +264,8 @@ gst_rtsp_stream_init (GstRTSPStream * stream) priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_caps_unref); + priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL, + (GDestroyNotify) gst_caps_unref); } static void @@ -258,12 +292,19 @@ gst_rtsp_stream_finalize (GObject * obj) gst_rtsp_address_free (priv->server_addr_v6); if (priv->pool) g_object_unref (priv->pool); + if (priv->rtxsend) + g_object_unref (priv->rtxsend); + gst_object_unref (priv->payloader); - gst_object_unref (priv->srcpad); + if (priv->srcpad) + gst_object_unref (priv->srcpad); + if (priv->sinkpad) + gst_object_unref (priv->sinkpad); g_free (priv->control); g_mutex_clear (&priv->lock); g_hash_table_unref (priv->keys); + g_hash_table_destroy (priv->ptmap); G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj); } @@ -313,29 +354,32 @@ gst_rtsp_stream_set_property (GObject * object, guint propid, /** * gst_rtsp_stream_new: * @idx: an index - * @srcpad: a #GstPad + * @pad: a #GstPad * @payloader: a #GstElement * * Create a new media stream with index @idx that handles RTP data on - * @srcpad and has a payloader element @payloader. + * @pad and has a payloader element @payloader if @pad is a source pad + * or a depayloader element @payloader if @pad is a sink pad. * * Returns: (transfer full): a new #GstRTSPStream */ GstRTSPStream * -gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad) +gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad) { GstRTSPStreamPrivate *priv; GstRTSPStream *stream; g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL); - g_return_val_if_fail (GST_IS_PAD (srcpad), NULL); - g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL); + g_return_val_if_fail (GST_IS_PAD (pad), NULL); stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL); priv = stream->priv; priv->idx = idx; priv->payloader = gst_object_ref (payloader); - priv->srcpad = gst_object_ref (srcpad); + if (GST_PAD_IS_SRC (pad)) + priv->srcpad = gst_object_ref (pad); + else + priv->sinkpad = gst_object_ref (pad); return stream; } @@ -392,10 +436,32 @@ gst_rtsp_stream_get_srcpad (GstRTSPStream * stream) { g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + if (!stream->priv->srcpad) + return NULL; + return gst_object_ref (stream->priv->srcpad); } /** + * gst_rtsp_stream_get_sinkpad: + * @stream: a #GstRTSPStream + * + * Get the sinkpad associated with @stream. + * + * Returns: (transfer full): the sinkpad. Unref after usage. + */ +GstPad * +gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream) +{ + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + + if (!stream->priv->sinkpad) + return NULL; + + return gst_object_ref (stream->priv->sinkpad); +} + +/** * gst_rtsp_stream_get_control: * @stream: a #GstRTSPStream * @@ -804,8 +870,9 @@ gst_rtsp_stream_get_address_pool (GstRTSPStream * stream) * * Get the multicast address of @stream for @family. * - * Returns: (transfer full): the #GstRTSPAddress of @stream or %NULL when no - * address could be allocated. gst_rtsp_address_free() after usage. + * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream + * or %NULL when no address could be allocated. gst_rtsp_address_free() + * after usage. */ GstRTSPAddress * gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, @@ -869,8 +936,8 @@ no_address: * * Reserve @address and @port as the address and port of @stream. * - * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be - * reserved. gst_rtsp_address_free() after usage. + * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when + * the address could be reserved. gst_rtsp_address_free() after usage. */ GstRTSPAddress * gst_rtsp_stream_reserve_address (GstRTSPStream * stream, @@ -950,11 +1017,12 @@ different_address: } static gboolean -alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size, - GSocketFamily family, GstElement * udpsrc_out[2], +alloc_ports_one_family (GstRTSPStream * stream, GstRTSPAddressPool * pool, + gint buffer_size, GSocketFamily family, GstElement * udpsrc_out[2], GstElement * udpsink_out[2], GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out) { + GstRTSPStreamPrivate *priv = stream->priv; GstStateChangeReturn ret; GstElement *udpsrc0, *udpsrc1; GstElement *udpsink0, *udpsink1; @@ -1123,6 +1191,11 @@ again: g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL); g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL); g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL); + /* Needs to be async for RECORD streams, otherwise we will never go to + * PLAYING because the sinks will wait for data while the udpsrc can't + * provide data with timestamps in PAUSED. */ + if (priv->sinkpad) + g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL); g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL); g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL); g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL); @@ -1135,6 +1208,7 @@ again: udpsrc_out[1] = udpsrc1; udpsink_out[0] = udpsink0; udpsink_out[1] = udpsink1; + server_port_out->min = rtpport; server_port_out->max = rtcpport; @@ -1201,11 +1275,13 @@ alloc_ports (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = stream->priv; - priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size, + priv->have_ipv4 = + alloc_ports_one_family (stream, priv->pool, priv->buffer_size, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink, &priv->server_port_v4, &priv->server_addr_v4); - priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size, + priv->have_ipv6 = + alloc_ports_one_family (stream, priv->pool, priv->buffer_size, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink, &priv->server_port_v6, &priv->server_addr_v6); @@ -1291,6 +1367,139 @@ gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc) g_mutex_unlock (&priv->lock); } +/** + * gst_rtsp_stream_set_retransmission_time: + * @stream: a #GstRTSPStream + * @time: a #GstClockTime + * + * Set the amount of time to store retransmission packets. + */ +void +gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream, + GstClockTime time) +{ + GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time); + + g_mutex_lock (&stream->priv->lock); + stream->priv->rtx_time = time; + if (stream->priv->rtxsend) + g_object_set (stream->priv->rtxsend, "max-size-time", + GST_TIME_AS_MSECONDS (time), NULL); + g_mutex_unlock (&stream->priv->lock); +} + +/** + * gst_rtsp_stream_get_retransmission_time: + * @stream: a #GstRTSPStream + * + * Get the amount of time to store retransmission data. + * + * Returns: the amount of time to store retransmission data. + */ +GstClockTime +gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream) +{ + GstClockTime ret; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); + + g_mutex_lock (&stream->priv->lock); + ret = stream->priv->rtx_time; + g_mutex_unlock (&stream->priv->lock); + + return ret; +} + +/** + * gst_rtsp_stream_set_retransmission_pt: + * @stream: a #GstRTSPStream + * @rtx_pt: a #guint + * + * Set the payload type (pt) for retransmission of this stream. + */ +void +gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt) +{ + g_return_if_fail (GST_IS_RTSP_STREAM (stream)); + + GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt); + + g_mutex_lock (&stream->priv->lock); + stream->priv->rtx_pt = rtx_pt; + if (stream->priv->rtxsend) { + guint pt = gst_rtsp_stream_get_pt (stream); + gchar *pt_s = g_strdup_printf ("%d", pt); + GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map", + pt_s, G_TYPE_UINT, rtx_pt, NULL); + g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL); + g_free (pt_s); + gst_structure_free (rtx_pt_map); + } + g_mutex_unlock (&stream->priv->lock); +} + +/** + * gst_rtsp_stream_get_retransmission_pt: + * @stream: a #GstRTSPStream + * + * Get the payload-type used for retransmission of this stream + * + * Returns: The retransmission PT. + */ +guint +gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream) +{ + guint rtx_pt; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); + + g_mutex_lock (&stream->priv->lock); + rtx_pt = stream->priv->rtx_pt; + g_mutex_unlock (&stream->priv->lock); + + return rtx_pt; +} + +/** + * gst_rtsp_stream_set_buffer_size: + * @stream: a #GstRTSPStream + * @size: the buffer size + * + * Set the size of the UDP transmission buffer (in bytes) + * Needs to be set before the stream is joined to a bin. + * + * Since: 1.6 + */ +void +gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size) +{ + g_mutex_lock (&stream->priv->lock); + stream->priv->buffer_size = size; + g_mutex_unlock (&stream->priv->lock); +} + +/** + * gst_rtsp_stream_get_buffer_size: + * @stream: a #GstRTSPStream + * + * Get the size of the UDP transmission buffer (in bytes) + * + * Returns: the size of the UDP TX buffer + * + * Since: 1.6 + */ +guint +gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream) +{ + guint buffer_size; + + g_mutex_lock (&stream->priv->lock); + buffer_size = stream->priv->buffer_size; + g_mutex_unlock (&stream->priv->lock); + + return buffer_size; +} + /* executed from streaming thread */ static void caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream) @@ -1474,11 +1683,17 @@ on_timeout (GObject * session, GObject * source, GstRTSPStream * stream) } static void -clear_tr_cache (GstRTSPStreamPrivate * priv) +clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp) { - g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL); - g_list_free (priv->tr_cache); - priv->tr_cache = NULL; + if (is_rtp) { + g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL); + g_list_free (priv->tr_cache_rtp); + priv->tr_cache_rtp = NULL; + } else { + g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL); + g_list_free (priv->tr_cache_rtcp); + priv->tr_cache_rtcp = NULL; + } } static GstFlowReturn @@ -1502,22 +1717,37 @@ handle_new_sample (GstAppSink * sink, gpointer user_data) is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0]; g_mutex_lock (&priv->lock); - if (priv->tr_changed) { - clear_tr_cache (priv); - for (walk = priv->transports; walk; walk = g_list_next (walk)) { - GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; - priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr)); + if (is_rtp) { + if (priv->tr_cache_cookie_rtp != priv->transports_cookie) { + clear_tr_cache (priv, is_rtp); + for (walk = priv->transports; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; + priv->tr_cache_rtp = + g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr)); + } + priv->tr_cache_cookie_rtp = priv->transports_cookie; + } + } else { + if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) { + clear_tr_cache (priv, is_rtp); + for (walk = priv->transports; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; + priv->tr_cache_rtcp = + g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr)); + } + priv->tr_cache_cookie_rtcp = priv->transports_cookie; } - priv->tr_changed = FALSE; } g_mutex_unlock (&priv->lock); - for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) { - GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; - - if (is_rtp) { + if (is_rtp) { + for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; gst_rtsp_stream_transport_send_rtp (tr, buffer); - } else { + } + } else { + for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; gst_rtsp_stream_transport_send_rtcp (tr, buffer); } } @@ -1621,7 +1851,7 @@ request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream) } static GstElement * -request_rtcp_decoder (GstElement * rtpbin, guint session, +request_rtp_rtcp_decoder (GstElement * rtpbin, guint session, GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = stream->priv; @@ -1643,6 +1873,155 @@ request_rtcp_decoder (GstElement * rtpbin, guint session, } /** + * gst_rtsp_stream_request_aux_sender: + * @stream: a #GstRTSPStream + * @sessid: the session id + * + * Creating a rtxsend bin + * + * Returns: (transfer full): a #GstElement. + * + * Since: 1.6 + */ +GstElement * +gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid) +{ + GstElement *bin; + GstPad *pad; + GstStructure *pt_map; + gchar *name; + guint pt, rtx_pt; + gchar *pt_s; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + + pt = gst_rtsp_stream_get_pt (stream); + pt_s = g_strdup_printf ("%u", pt); + rtx_pt = stream->priv->rtx_pt; + + GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt); + + bin = gst_bin_new (NULL); + stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL); + pt_map = gst_structure_new ("application/x-rtp-pt-map", + pt_s, G_TYPE_UINT, rtx_pt, NULL); + g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map, + "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL); + g_free (pt_s); + gst_structure_free (pt_map); + gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend)); + + pad = gst_element_get_static_pad (stream->priv->rtxsend, "src"); + name = g_strdup_printf ("src_%u", sessid); + gst_element_add_pad (bin, gst_ghost_pad_new (name, pad)); + g_free (name); + gst_object_unref (pad); + + pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink"); + name = g_strdup_printf ("sink_%u", sessid); + gst_element_add_pad (bin, gst_ghost_pad_new (name, pad)); + g_free (name); + gst_object_unref (pad); + + return bin; +} + +/** + * gst_rtsp_stream_set_pt_map: + * @stream: a #GstRTSPStream + * @pt: the pt + * @caps: a #GstCaps + * + * Configure a pt map between @pt and @caps. + */ +void +gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps) +{ + GstRTSPStreamPrivate *priv = stream->priv; + + g_mutex_lock (&priv->lock); + g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps)); + g_mutex_unlock (&priv->lock); +} + +static GstCaps * +request_pt_map (GstElement * rtpbin, guint session, guint pt, + GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GstCaps *caps = NULL; + + g_mutex_lock (&priv->lock); + + if (priv->idx == session) { + caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt)); + if (caps) { + GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps); + gst_caps_ref (caps); + } else { + GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt); + } + } + + g_mutex_unlock (&priv->lock); + + return caps; +} + +static void +pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + gchar *name; + GstPadLinkReturn ret; + guint sessid; + + GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream, + GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad)); + + name = gst_pad_get_name (pad); + if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) { + g_free (name); + return; + } + g_free (name); + + if (priv->idx != sessid) + return; + + if (gst_pad_is_linked (priv->sinkpad)) { + GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream, + GST_DEBUG_PAD_NAME (priv->sinkpad)); + return; + } + + /* link the RTP pad to the session manager, it should not really fail unless + * this is not really an RTP pad */ + ret = gst_pad_link (pad, priv->sinkpad); + if (ret != GST_PAD_LINK_OK) + goto link_failed; + priv->recv_rtp_src = gst_object_ref (pad); + + return; + +/* ERRORS */ +link_failed: + { + GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream, + GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad)); + } +} + +static void +on_npt_stop (GstElement * rtpbin, guint session, guint ssrc, + GstRTSPStream * stream) +{ + /* TODO: What to do here other than this? */ + GST_DEBUG ("Stream %p: Got EOS", stream); + gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ()); +} + +/** * gst_rtsp_stream_join_bin: * @stream: a #GstRTSPStream * @bin: (transfer none): a #GstBin to join @@ -1695,31 +2074,47 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, (GCallback) request_rtp_encoder, stream); g_signal_connect (rtpbin, "request-rtcp-encoder", (GCallback) request_rtcp_encoder, stream); + g_signal_connect (rtpbin, "request-rtp-decoder", + (GCallback) request_rtp_rtcp_decoder, stream); g_signal_connect (rtpbin, "request-rtcp-decoder", - (GCallback) request_rtcp_decoder, stream); + (GCallback) request_rtp_rtcp_decoder, stream); + } + + if (priv->sinkpad) { + g_signal_connect (rtpbin, "request-pt-map", + (GCallback) request_pt_map, stream); } /* get a pad for sending RTP */ name = g_strdup_printf ("send_rtp_sink_%u", idx); priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name); g_free (name); - /* link the RTP pad to the session manager, it should not really fail unless - * this is not really an RTP pad */ - ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink); - if (ret != GST_PAD_LINK_OK) - goto link_failed; + + if (priv->srcpad) { + /* link the RTP pad to the session manager, it should not really fail unless + * this is not really an RTP pad */ + ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink); + if (ret != GST_PAD_LINK_OK) + goto link_failed; + } else { + /* Need to connect our sinkpad from here */ + g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream); + /* EOS */ + g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream); + } /* get pads from the RTP session element for sending and receiving * RTP/RTCP*/ name = g_strdup_printf ("send_rtp_src_%u", idx); priv->send_src[0] = gst_element_get_static_pad (rtpbin, name); g_free (name); - name = g_strdup_printf ("send_rtcp_src_%u", idx); - priv->send_src[1] = gst_element_get_request_pad (rtpbin, name); - g_free (name); name = g_strdup_printf ("recv_rtp_sink_%u", idx); priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name); g_free (name); + + name = g_strdup_printf ("send_rtcp_src_%u", idx); + priv->send_src[1] = gst_element_get_request_pad (rtpbin, name); + g_free (name); name = g_strdup_printf ("recv_rtcp_sink_%u", idx); priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name); g_free (name); @@ -1744,14 +2139,14 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, GstPad *teepad, *queuepad; /* For the sender we create this bit of pipeline for both * RTP and RTCP. Sync and preroll are enabled on udpsink so - * we need to add a queue before appsink to make the pipeline - * not block. For the TCP case, we want to pump data to the - * client as fast as possible anyway. + * we need to add a queue before appsink and udpsink to make + * the pipeline not block. For the TCP case, we want to pump + * data to the client as fast as possible. * - * .--------. .-----. .---------. - * | rtpbin | | tee | | udpsink | - * | send->sink src->sink | - * '--------' | | '---------' + * .--------. .-----. .---------. .---------. + * | rtpbin | | tee | | queue | | udpsink | + * | send->sink src->sink src->sink | + * '--------' | | '---------' '---------' * | | .---------. .---------. * | | | queue | | appsink | * | src->sink src->sink | @@ -1774,13 +2169,26 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, gst_pad_link (priv->send_src[i], pad); gst_object_unref (pad); - /* link tee to udpsink */ + priv->udpqueue[i] = gst_element_factory_make ("queue", NULL); + g_object_set (priv->udpqueue[i], "max-size-buffers", + 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL); + gst_bin_add (bin, priv->udpqueue[i]); + /* link tee to udpqueue */ teepad = gst_element_get_request_pad (priv->tee[i], "src_%u"); - gst_pad_link (teepad, sinkpad); + pad = gst_element_get_static_pad (priv->udpqueue[i], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); gst_object_unref (teepad); + /* link udpqueue to udpsink */ + queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src"); + gst_pad_link (queuepad, sinkpad); + gst_object_unref (queuepad); + /* make queue */ priv->appqueue[i] = gst_element_factory_make ("queue", NULL); + g_object_set (priv->appqueue[i], "max-size-buffers", + 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL); gst_bin_add (bin, priv->appqueue[i]); /* and link to tee */ teepad = gst_element_get_request_pad (priv->tee[i], "src_%u"); @@ -1830,10 +2238,12 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, gst_object_unref (pad); if (priv->udpsrc_v4[i]) { - /* we set and keep these to playing so that they don't cause NO_PREROLL return - * values */ - gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING); - gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE); + if (priv->srcpad) { + /* we set and keep these to playing so that they don't cause NO_PREROLL return + * values. This is only relevant for PLAY pipelines */ + gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING); + gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE); + } /* add udpsrc */ gst_bin_add (bin, priv->udpsrc_v4[i]); @@ -1846,8 +2256,10 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, } if (priv->udpsrc_v6[i]) { - gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING); - gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE); + if (priv->srcpad) { + gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING); + gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE); + } gst_bin_add (bin, priv->udpsrc_v6[i]); /* and link to the funnel v6 */ @@ -1861,6 +2273,8 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) { /* make and add appsrc */ priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); + priv->appsrc_base_time[i] = -1; + g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL); gst_bin_add (bin, priv->appsrc[i]); /* and link to the funnel */ selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); @@ -1878,6 +2292,8 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, gst_element_set_state (priv->appsink[i], state); if (priv->appqueue[i]) gst_element_set_state (priv->appqueue[i], state); + if (priv->udpqueue[i]) + gst_element_set_state (priv->udpqueue[i], state); if (priv->tee[i]) gst_element_set_state (priv->tee[i], state); if (priv->funnel[i]) @@ -1934,6 +2350,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, { GstRTSPStreamPrivate *priv; gint i; + GList *l; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); g_return_val_if_fail (GST_IS_BIN (bin), FALSE); @@ -1946,13 +2363,21 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, goto was_not_joined; /* all transports must be removed by now */ - g_return_val_if_fail (priv->transports == NULL, FALSE); + if (priv->transports != NULL) + goto transports_not_removed; - clear_tr_cache (priv); + clear_tr_cache (priv, TRUE); + clear_tr_cache (priv, FALSE); GST_INFO ("stream %p leaving bin", stream); - gst_pad_unlink (priv->srcpad, priv->send_rtp_sink); + if (priv->srcpad) { + gst_pad_unlink (priv->srcpad, priv->send_rtp_sink); + } else if (priv->recv_rtp_src) { + gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad); + gst_object_unref (priv->recv_rtp_src); + priv->recv_rtp_src = NULL; + } g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig); gst_element_release_request_pad (rtpbin, priv->send_rtp_sink); gst_object_unref (priv->send_rtp_sink); @@ -1965,6 +2390,8 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, gst_element_set_state (priv->appsink[i], GST_STATE_NULL); if (priv->appqueue[i]) gst_element_set_state (priv->appqueue[i], GST_STATE_NULL); + if (priv->udpqueue[i]) + gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL); if (priv->tee[i]) gst_element_set_state (priv->tee[i], GST_STATE_NULL); if (priv->funnel[i]) @@ -1984,6 +2411,18 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL); gst_bin_remove (bin, priv->udpsrc_v6[i]); } + + for (l = priv->transport_sources; l; l = l->next) { + GstRTSPMulticastTransportSource *s = l->data; + + if (!s->udpsrc[i]) + continue; + + gst_element_set_locked_state (s->udpsrc[i], FALSE); + gst_element_set_state (s->udpsrc[i], GST_STATE_NULL); + gst_bin_remove (bin, s->udpsrc[i]); + } + if (priv->udpsink[i]) gst_bin_remove (bin, priv->udpsink[i]); if (priv->appsrc[i]) @@ -1992,6 +2431,8 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, gst_bin_remove (bin, priv->appsink[i]); if (priv->appqueue[i]) gst_bin_remove (bin, priv->appqueue[i]); + if (priv->udpqueue[i]) + gst_bin_remove (bin, priv->udpqueue[i]); if (priv->tee[i]) gst_bin_remove (bin, priv->tee[i]); if (priv->funnel[i]) @@ -2007,9 +2448,18 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, priv->appsrc[i] = NULL; priv->appsink[i] = NULL; priv->appqueue[i] = NULL; + priv->udpqueue[i] = NULL; priv->tee[i] = NULL; priv->funnel[i] = NULL; } + + for (l = priv->transport_sources; l; l = l->next) { + GstRTSPMulticastTransportSource *s = l->data; + g_slice_free (GstRTSPMulticastTransportSource, s); + } + g_list_free (priv->transport_sources); + priv->transport_sources = NULL; + gst_object_unref (priv->send_src[0]); priv->send_src[0] = NULL; @@ -2025,6 +2475,8 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, if (priv->srtpenc) gst_object_unref (priv->srtpenc); + if (priv->srtpdec) + gst_object_unref (priv->srtpdec); priv->is_joined = FALSE; g_mutex_unlock (&priv->lock); @@ -2036,6 +2488,12 @@ was_not_joined: g_mutex_unlock (&priv->lock); return TRUE; } +transports_not_removed: + { + GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)"); + g_mutex_unlock (&priv->lock); + return FALSE; + } } /** @@ -2068,6 +2526,62 @@ gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream, g_mutex_lock (&priv->lock); + /* First try to extract the information from the last buffer on the sinks. + * This will have a more accurate sequence number and timestamp, as between + * the payloader and the sink there can be some queues + */ + if (priv->udpsink[0] || priv->appsink[0]) { + GstSample *last_sample; + + if (priv->udpsink[0]) + g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL); + else + g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL); + + if (last_sample) { + GstCaps *caps; + GstBuffer *buffer; + GstSegment *segment; + GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT; + + caps = gst_sample_get_caps (last_sample); + buffer = gst_sample_get_buffer (last_sample); + segment = gst_sample_get_segment (last_sample); + + if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) { + if (seq) { + *seq = gst_rtp_buffer_get_seq (&rtp_buffer); + } + + if (rtptime) { + *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer); + } + + gst_rtp_buffer_unmap (&rtp_buffer); + + if (running_time) { + *running_time = + gst_segment_to_running_time (segment, GST_FORMAT_TIME, + GST_BUFFER_TIMESTAMP (buffer)); + } + + if (clock_rate) { + GstStructure *s = gst_caps_get_structure (caps, 0); + + gst_structure_get_int (s, "clock-rate", (gint *) clock_rate); + + if (*clock_rate == 0 && running_time) + *running_time = GST_CLOCK_TIME_NONE; + } + gst_sample_unref (last_sample); + + goto done; + } else { + gst_sample_unref (last_sample); + } + } + } + if (g_object_class_find_property (payobjclass, "stats")) { g_object_get (priv->payloader, "stats", &stats, NULL); if (stats == NULL) @@ -2102,6 +2616,8 @@ gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream, if (running_time) *running_time = GST_CLOCK_TIME_NONE; } + +done: g_mutex_unlock (&priv->lock); return TRUE; @@ -2174,6 +2690,31 @@ gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer) g_mutex_unlock (&priv->lock); if (element) { + if (priv->appsrc_base_time[0] == -1) { + /* Take current running_time. This timestamp will be put on + * the first buffer of each stream because we are a live source and so we + * timestamp with the running_time. When we are dealing with TCP, we also + * only timestamp the first buffer (using the DISCONT flag) because a server + * typically bursts data, for which we don't want to compensate by speeding + * up the media. The other timestamps will be interpollated from this one + * using the RTP timestamps. */ + GST_OBJECT_LOCK (element); + if (GST_ELEMENT_CLOCK (element)) { + GstClockTime now; + GstClockTime base_time; + + now = gst_clock_get_time (GST_ELEMENT_CLOCK (element)); + base_time = GST_ELEMENT_CAST (element)->base_time; + + priv->appsrc_base_time[0] = now - base_time; + GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0]; + GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT + ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now), + GST_TIME_ARGS (base_time)); + } + GST_OBJECT_UNLOCK (element); + } + ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer); gst_object_unref (element); } else { @@ -2204,8 +2745,11 @@ gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer) g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR); priv = stream->priv; g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); - g_return_val_if_fail (priv->is_joined, FALSE); + if (!priv->is_joined) { + gst_buffer_unref (buffer); + return GST_FLOW_NOT_LINKED; + } g_mutex_lock (&priv->lock); if (priv->appsrc[1]) element = gst_object_ref (priv->appsrc[1]); @@ -2214,6 +2758,31 @@ gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer) g_mutex_unlock (&priv->lock); if (element) { + if (priv->appsrc_base_time[1] == -1) { + /* Take current running_time. This timestamp will be put on + * the first buffer of each stream because we are a live source and so we + * timestamp with the running_time. When we are dealing with TCP, we also + * only timestamp the first buffer (using the DISCONT flag) because a server + * typically bursts data, for which we don't want to compensate by speeding + * up the media. The other timestamps will be interpollated from this one + * using the RTP timestamps. */ + GST_OBJECT_LOCK (element); + if (GST_ELEMENT_CLOCK (element)) { + GstClockTime now; + GstClockTime base_time; + + now = gst_clock_get_time (GST_ELEMENT_CLOCK (element)); + base_time = GST_ELEMENT_CAST (element)->base_time; + + priv->appsrc_base_time[1] = now - base_time; + GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1]; + GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT + ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now), + GST_TIME_ARGS (base_time)); + } + GST_OBJECT_UNLOCK (element); + } + ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer); gst_object_unref (element); } else { @@ -2234,9 +2803,86 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, tr = gst_rtsp_stream_transport_get_transport (trans); switch (tr->lower_transport) { - case GST_RTSP_LOWER_TRANS_UDP: case GST_RTSP_LOWER_TRANS_UDP_MCAST: { + GstRTSPMulticastTransportSource *source; + GstBin *bin; + + bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0]))); + + if (add) { + gchar *host; + gint i; + GstPad *selpad, *pad; + + source = g_slice_new0 (GstRTSPMulticastTransportSource); + source->transport = trans; + + for (i = 0; i < 2; i++) { + host = + g_strdup_printf ("udp://%s:%d", tr->destination, + (i == 0) ? tr->port.min : tr->port.max); + source->udpsrc[i] = + gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL); + g_free (host); + + if (priv->srcpad) { + /* we set and keep these to playing so that they don't cause NO_PREROLL return + * values. This is only relevant for PLAY pipelines */ + gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING); + gst_element_set_locked_state (source->udpsrc[i], TRUE); + } + /* add udpsrc */ + gst_bin_add (bin, source->udpsrc[i]); + + /* and link to the funnel v4 */ + source->selpad[i] = selpad = + gst_element_get_request_pad (priv->funnel[i], "sink_%u"); + pad = gst_element_get_static_pad (source->udpsrc[i], "src"); + gst_pad_link (pad, selpad); + gst_object_unref (pad); + gst_object_unref (selpad); + } + gst_object_unref (bin); + + priv->transport_sources = + g_list_prepend (priv->transport_sources, source); + } else { + GList *l; + + for (l = priv->transport_sources; l; l = l->next) { + source = l->data; + + if (source->transport == trans) { + priv->transport_sources = + g_list_delete_link (priv->transport_sources, l); + break; + } + } + + if (l != NULL) { + gint i; + + for (i = 0; i < 2; i++) { + /* Will automatically unlink everything */ + gst_bin_remove (bin, + GST_ELEMENT (gst_object_ref (source->udpsrc[i]))); + + gst_element_set_state (source->udpsrc[i], GST_STATE_NULL); + gst_object_unref (source->udpsrc[i]); + + gst_element_release_request_pad (priv->funnel[i], + source->selpad[i]); + } + + g_slice_free (GstRTSPMulticastTransportSource, source); + } + } + + /* fall through for the generic case */ + } + case GST_RTSP_LOWER_TRANS_UDP: + { gchar *dest; gint min, max; guint ttl = 0; @@ -2267,7 +2913,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL); priv->transports = g_list_remove (priv->transports, trans); } - priv->tr_changed = TRUE; + priv->transports_cookie++; break; } case GST_RTSP_LOWER_TRANS_TCP: @@ -2278,7 +2924,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, GST_INFO ("removing TCP %s", tr->destination); priv->transports = g_list_remove (priv->transports, trans); } - priv->tr_changed = TRUE; + priv->transports_cookie++; break; default: goto unknown_transport; @@ -2364,7 +3010,7 @@ gst_rtsp_stream_remove_transport (GstRTSPStream * stream, * gst_rtsp_stream_update_crypto: * @stream: a #GstRTSPStream * @ssrc: the SSRC - * @crypto: (transfer none) (allow none): a #GstCaps with crypto info + * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info * * Update the new crypto information for @ssrc in @stream. If information * for @ssrc did not exist, it will be added. If information @@ -2380,7 +3026,7 @@ gst_rtsp_stream_update_crypto (GstRTSPStream * stream, GstRTSPStreamPrivate *priv; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); - g_return_val_if_fail (GST_IS_CAPS (crypto), FALSE); + g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE); priv = stream->priv; @@ -2406,8 +3052,8 @@ gst_rtsp_stream_update_crypto (GstRTSPStream * stream, * * @stream must be joined to a bin. * - * Returns: (transfer full): the RTP socket or %NULL if no socket could be - * allocated for @family. Unref after usage + * Returns: (transfer full) (nullable): the RTP socket or %NULL if no + * socket could be allocated for @family. Unref after usage */ GSocket * gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family) @@ -2440,8 +3086,8 @@ gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family) * * @stream must be joined to a bin. * - * Returns: (transfer full): the RTCP socket or %NULL if no socket could be - * allocated for @family. Unref after usage + * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no + * socket could be allocated for @family. Unref after usage */ GSocket * gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family) @@ -2466,6 +3112,48 @@ gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family) } /** + * gst_rtsp_stream_set_seqnum: + * @stream: a #GstRTSPStream + * @seqnum: a new sequence number + * + * Configure the sequence number in the payloader of @stream to @seqnum. + */ +void +gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum) +{ + GstRTSPStreamPrivate *priv; + + g_return_if_fail (GST_IS_RTSP_STREAM (stream)); + + priv = stream->priv; + + g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL); +} + +/** + * gst_rtsp_stream_get_seqnum: + * @stream: a #GstRTSPStream + * + * Get the configured sequence number in the payloader of @stream. + * + * Returns: the sequence number of the payloader. + */ +guint16 +gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv; + guint seqnum; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); + + priv = stream->priv; + + g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL); + + return seqnum; +} + +/** * gst_rtsp_stream_transport_filter: * @stream: a #GstRTSPStream * @func: (scope call) (allow-none): a callback @@ -2496,25 +3184,43 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream, { GstRTSPStreamPrivate *priv; GList *result, *walk, *next; + GHashTable *visited = NULL; + guint cookie; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; result = NULL; + if (func) + visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); g_mutex_lock (&priv->lock); +restart: + cookie = priv->transports_cookie; for (walk = priv->transports; walk; walk = next) { GstRTSPStreamTransport *trans = walk->data; GstRTSPFilterResult res; + gboolean changed; next = g_list_next (walk); - if (func) + if (func) { + /* only visit each transport once */ + if (g_hash_table_contains (visited, trans)) + continue; + + g_hash_table_add (visited, g_object_ref (trans)); + g_mutex_unlock (&priv->lock); + res = func (stream, trans, user_data); - else + + g_mutex_lock (&priv->lock); + } else res = GST_RTSP_FILTER_REF; + changed = (cookie != priv->transports_cookie); + switch (res) { case GST_RTSP_FILTER_REMOVE: update_transport (stream, trans, FALSE); @@ -2526,9 +3232,14 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream, default: break; } + if (changed) + goto restart; } g_mutex_unlock (&priv->lock); + if (func) + g_hash_table_unref (visited); + return result; } @@ -2617,3 +3328,81 @@ gst_rtsp_stream_is_blocking (GstRTSPStream * stream) return result; } + +/** + * gst_rtsp_stream_query_position: + * @stream: a #GstRTSPStream + * + * Query the position of the stream in %GST_FORMAT_TIME. This only considers + * the RTP parts of the pipeline and not the RTCP parts. + * + * Returns: %TRUE if the position could be queried + */ +gboolean +gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position) +{ + GstRTSPStreamPrivate *priv; + GstElement *sink; + gboolean ret; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + + g_mutex_lock (&priv->lock); + if ((sink = priv->udpsink[0])) + gst_object_ref (sink); + g_mutex_unlock (&priv->lock); + + if (!sink) + return FALSE; + + ret = gst_element_query_position (sink, GST_FORMAT_TIME, position); + gst_object_unref (sink); + + return ret; +} + +/** + * gst_rtsp_stream_query_stop: + * @stream: a #GstRTSPStream + * + * Query the stop of the stream in %GST_FORMAT_TIME. This only considers + * the RTP parts of the pipeline and not the RTCP parts. + * + * Returns: %TRUE if the stop could be queried + */ +gboolean +gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop) +{ + GstRTSPStreamPrivate *priv; + GstElement *sink; + GstQuery *query; + gboolean ret; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + + g_mutex_lock (&priv->lock); + if ((sink = priv->udpsink[0])) + gst_object_ref (sink); + g_mutex_unlock (&priv->lock); + + if (!sink) + return FALSE; + + query = gst_query_new_segment (GST_FORMAT_TIME); + if ((ret = gst_element_query (sink, query))) { + GstFormat format; + + gst_query_parse_segment (query, NULL, &format, NULL, stop); + if (format != GST_FORMAT_TIME) + *stop = -1; + } + gst_query_unref (query); + gst_object_unref (sink); + + return ret; + +}