2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3 * Copyright (C) 2015 Centricular Ltd
4 * Author: Sebastian Dröge <sebastian@centricular.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
23 * @short_description: A media stream
24 * @see_also: #GstRTSPMedia
26 * The #GstRTSPStream object manages the data transport for one stream. It
27 * is created from a payloader element and a source pad that produce the RTP
28 * packets for the stream.
30 * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31 * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
33 * The #GstRTSPStream will use the configured addresspool, as set with
34 * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35 * stream. With gst_rtsp_stream_get_multicast_address() you can get the
38 * With gst_rtsp_stream_get_server_port () you can get the port that the server
39 * will use to receive RTCP. This is the part that the clients will use to send
42 * With gst_rtsp_stream_add_transport() destinations can be added where the
43 * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44 * the destination again.
46 * Last reviewed on 2013-07-16 (1.0.0)
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
58 #include <gst/rtp/gstrtpbuffer.h>
60 #include "rtsp-stream.h"
62 struct _GstRTSPStreamPrivate
66 /* Only one pad is ever set */
67 GstPad *srcpad, *sinkpad;
68 GstElement *payloader;
72 /* TRUE if this stream is running on
73 * the client side of an RTSP link (for RECORD) */
77 /* TRUE if stream is complete. This means that the receiver and the sender
78 * parts are present in the stream. */
80 GstRTSPProfile profiles;
81 GstRTSPLowerTrans protocols;
83 /* pads on the rtpbin */
84 GstPad *send_rtp_sink;
89 /* the RTPSession object */
92 /* SRTP encoder/decoder */
98 GstElement *udpsrc_v4[2];
99 GstElement *udpsrc_v6[2];
100 GstElement *udpqueue[2];
101 GstElement *udpsink[2];
102 GSocket *socket_v4[2];
103 GSocket *socket_v6[2];
105 /* for UDP multicast */
106 GstElement *mcast_udpsrc_v4[2];
107 GstElement *mcast_udpsrc_v6[2];
108 GstElement *mcast_udpqueue[2];
109 GstElement *mcast_udpsink[2];
110 GSocket *mcast_socket_v4[2];
111 GSocket *mcast_socket_v6[2];
113 /* for TCP transport */
114 GstElement *appsrc[2];
115 GstClockTime appsrc_base_time[2];
116 GstElement *appqueue[2];
117 GstElement *appsink[2];
120 GstElement *funnel[2];
124 GstElement *rtxreceive;
126 GstClockTime rtx_time;
128 /* Forward Error Correction with RFC 5109 */
129 GstElement *ulpfec_decoder;
130 GstElement *ulpfec_encoder;
132 gboolean ulpfec_enabled;
133 guint ulpfec_percentage;
135 /* pool used to manage unicast and multicast addresses */
136 GstRTSPAddressPool *pool;
138 /* unicast server addr/port */
139 GstRTSPAddress *server_addr_v4;
140 GstRTSPAddress *server_addr_v6;
142 /* multicast addresses */
143 GstRTSPAddress *mcast_addr_v4;
144 GstRTSPAddress *mcast_addr_v6;
146 gchar *multicast_iface;
148 /* the caps of the stream */
152 /* transports we stream to */
155 guint transports_cookie;
157 GList *tr_cache_rtcp;
158 guint tr_cache_cookie_rtp;
159 guint tr_cache_cookie_rtcp;
163 /* stream blocking */
164 gulong blocked_id[2];
167 /* current stream postion */
168 GstClockTime position;
170 /* pt->caps map for RECORD streams */
173 GstRTSPPublishClockMode publish_clock_mode;
176 #define DEFAULT_CONTROL NULL
177 #define DEFAULT_PROFILES GST_RTSP_PROFILE_AVP
178 #define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
179 GST_RTSP_LOWER_TRANS_TCP
192 SIGNAL_NEW_RTP_ENCODER,
193 SIGNAL_NEW_RTCP_ENCODER,
197 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
198 #define GST_CAT_DEFAULT rtsp_stream_debug
200 static GQuark ssrc_stream_map_key;
202 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
203 GValue * value, GParamSpec * pspec);
204 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
205 const GValue * value, GParamSpec * pspec);
207 static void gst_rtsp_stream_finalize (GObject * obj);
209 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
211 G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
214 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
216 GObjectClass *gobject_class;
218 gobject_class = G_OBJECT_CLASS (klass);
220 gobject_class->get_property = gst_rtsp_stream_get_property;
221 gobject_class->set_property = gst_rtsp_stream_set_property;
222 gobject_class->finalize = gst_rtsp_stream_finalize;
224 g_object_class_install_property (gobject_class, PROP_CONTROL,
225 g_param_spec_string ("control", "Control",
226 "The control string for this stream", DEFAULT_CONTROL,
227 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
229 g_object_class_install_property (gobject_class, PROP_PROFILES,
230 g_param_spec_flags ("profiles", "Profiles",
231 "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
232 DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
234 g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
235 g_param_spec_flags ("protocols", "Protocols",
236 "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
237 DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
239 gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
240 g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
241 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
242 G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
244 gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
245 g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
246 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
247 G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
249 GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
251 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
255 gst_rtsp_stream_init (GstRTSPStream * stream)
257 GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
259 GST_DEBUG ("new stream %p", stream);
264 priv->control = g_strdup (DEFAULT_CONTROL);
265 priv->profiles = DEFAULT_PROFILES;
266 priv->protocols = DEFAULT_PROTOCOLS;
267 priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
269 g_mutex_init (&priv->lock);
271 priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
272 NULL, (GDestroyNotify) gst_caps_unref);
273 priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
274 (GDestroyNotify) gst_caps_unref);
278 gst_rtsp_stream_finalize (GObject * obj)
280 GstRTSPStream *stream;
281 GstRTSPStreamPrivate *priv;
284 stream = GST_RTSP_STREAM (obj);
287 GST_DEBUG ("finalize stream %p", stream);
289 /* we really need to be unjoined now */
290 g_return_if_fail (priv->joined_bin == NULL);
292 if (priv->mcast_addr_v4)
293 gst_rtsp_address_free (priv->mcast_addr_v4);
294 if (priv->mcast_addr_v6)
295 gst_rtsp_address_free (priv->mcast_addr_v6);
296 if (priv->server_addr_v4)
297 gst_rtsp_address_free (priv->server_addr_v4);
298 if (priv->server_addr_v6)
299 gst_rtsp_address_free (priv->server_addr_v6);
301 g_object_unref (priv->pool);
303 g_object_unref (priv->rtxsend);
304 if (priv->rtxreceive)
305 g_object_unref (priv->rtxreceive);
306 if (priv->ulpfec_encoder)
307 gst_object_unref (priv->ulpfec_encoder);
308 if (priv->ulpfec_decoder)
309 gst_object_unref (priv->ulpfec_decoder);
311 for (i = 0; i < 2; i++) {
312 if (priv->socket_v4[i])
313 g_object_unref (priv->socket_v4[i]);
314 if (priv->socket_v6[i])
315 g_object_unref (priv->socket_v6[i]);
316 if (priv->mcast_socket_v4[i])
317 g_object_unref (priv->mcast_socket_v4[i]);
318 if (priv->mcast_socket_v6[i])
319 g_object_unref (priv->mcast_socket_v6[i]);
322 g_free (priv->multicast_iface);
324 gst_object_unref (priv->payloader);
326 gst_object_unref (priv->srcpad);
328 gst_object_unref (priv->sinkpad);
329 g_free (priv->control);
330 g_mutex_clear (&priv->lock);
332 g_hash_table_unref (priv->keys);
333 g_hash_table_destroy (priv->ptmap);
335 G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
339 gst_rtsp_stream_get_property (GObject * object, guint propid,
340 GValue * value, GParamSpec * pspec)
342 GstRTSPStream *stream = GST_RTSP_STREAM (object);
346 g_value_take_string (value, gst_rtsp_stream_get_control (stream));
349 g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
352 g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
355 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
360 gst_rtsp_stream_set_property (GObject * object, guint propid,
361 const GValue * value, GParamSpec * pspec)
363 GstRTSPStream *stream = GST_RTSP_STREAM (object);
367 gst_rtsp_stream_set_control (stream, g_value_get_string (value));
370 gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
373 gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
376 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
381 * gst_rtsp_stream_new:
384 * @payloader: a #GstElement
386 * Create a new media stream with index @idx that handles RTP data on
387 * @pad and has a payloader element @payloader if @pad is a source pad
388 * or a depayloader element @payloader if @pad is a sink pad.
390 * Returns: (transfer full): a new #GstRTSPStream
393 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
395 GstRTSPStreamPrivate *priv;
396 GstRTSPStream *stream;
398 g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
399 g_return_val_if_fail (GST_IS_PAD (pad), NULL);
401 stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
404 priv->payloader = gst_object_ref (payloader);
405 if (GST_PAD_IS_SRC (pad))
406 priv->srcpad = gst_object_ref (pad);
408 priv->sinkpad = gst_object_ref (pad);
414 * gst_rtsp_stream_get_index:
415 * @stream: a #GstRTSPStream
417 * Get the stream index.
419 * Return: the stream index.
422 gst_rtsp_stream_get_index (GstRTSPStream * stream)
424 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
426 return stream->priv->idx;
430 * gst_rtsp_stream_get_pt:
431 * @stream: a #GstRTSPStream
433 * Get the stream payload type.
435 * Return: the stream payload type.
438 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
440 GstRTSPStreamPrivate *priv;
443 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
447 g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
453 * gst_rtsp_stream_get_srcpad:
454 * @stream: a #GstRTSPStream
456 * Get the srcpad associated with @stream.
458 * Returns: (transfer full) (nullable): the srcpad. Unref after usage.
461 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
463 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
465 if (!stream->priv->srcpad)
468 return gst_object_ref (stream->priv->srcpad);
472 * gst_rtsp_stream_get_sinkpad:
473 * @stream: a #GstRTSPStream
475 * Get the sinkpad associated with @stream.
477 * Returns: (transfer full) (nullable): the sinkpad. Unref after usage.
480 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
482 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
484 if (!stream->priv->sinkpad)
487 return gst_object_ref (stream->priv->sinkpad);
491 * gst_rtsp_stream_get_control:
492 * @stream: a #GstRTSPStream
494 * Get the control string to identify this stream.
496 * Returns: (transfer full) (nullable): the control string. g_free() after usage.
499 gst_rtsp_stream_get_control (GstRTSPStream * stream)
501 GstRTSPStreamPrivate *priv;
504 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
508 g_mutex_lock (&priv->lock);
509 if ((result = g_strdup (priv->control)) == NULL)
510 result = g_strdup_printf ("stream=%u", priv->idx);
511 g_mutex_unlock (&priv->lock);
517 * gst_rtsp_stream_set_control:
518 * @stream: a #GstRTSPStream
519 * @control: (nullable): a control string
521 * Set the control string in @stream.
524 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
526 GstRTSPStreamPrivate *priv;
528 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
532 g_mutex_lock (&priv->lock);
533 g_free (priv->control);
534 priv->control = g_strdup (control);
535 g_mutex_unlock (&priv->lock);
539 * gst_rtsp_stream_has_control:
540 * @stream: a #GstRTSPStream
541 * @control: (nullable): a control string
543 * Check if @stream has the control string @control.
545 * Returns: %TRUE is @stream has @control as the control string
548 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
550 GstRTSPStreamPrivate *priv;
553 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
557 g_mutex_lock (&priv->lock);
559 res = (g_strcmp0 (priv->control, control) == 0);
563 if (sscanf (control, "stream=%u", &streamid) > 0)
564 res = (streamid == priv->idx);
568 g_mutex_unlock (&priv->lock);
574 * gst_rtsp_stream_set_mtu:
575 * @stream: a #GstRTSPStream
578 * Configure the mtu in the payloader of @stream to @mtu.
581 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
583 GstRTSPStreamPrivate *priv;
585 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
589 GST_LOG_OBJECT (stream, "set MTU %u", mtu);
591 g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
595 * gst_rtsp_stream_get_mtu:
596 * @stream: a #GstRTSPStream
598 * Get the configured MTU in the payloader of @stream.
600 * Returns: the MTU of the payloader.
603 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
605 GstRTSPStreamPrivate *priv;
608 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
612 g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
617 /* Update the dscp qos property on the udp sinks */
619 update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink)
621 GstRTSPStreamPrivate *priv;
626 g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL);
631 * gst_rtsp_stream_set_dscp_qos:
632 * @stream: a #GstRTSPStream
633 * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
635 * Configure the dscp qos of the outgoing sockets to @dscp_qos.
638 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
640 GstRTSPStreamPrivate *priv;
642 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
646 GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
648 if (dscp_qos < -1 || dscp_qos > 63) {
649 GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
653 priv->dscp_qos = dscp_qos;
655 update_dscp_qos (stream, priv->udpsink);
659 * gst_rtsp_stream_get_dscp_qos:
660 * @stream: a #GstRTSPStream
662 * Get the configured DSCP QoS in of the outgoing sockets.
664 * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
667 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
669 GstRTSPStreamPrivate *priv;
671 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
675 return priv->dscp_qos;
679 * gst_rtsp_stream_is_transport_supported:
680 * @stream: a #GstRTSPStream
681 * @transport: (transfer none): a #GstRTSPTransport
683 * Check if @transport can be handled by stream
685 * Returns: %TRUE if @transport can be handled by @stream.
688 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
689 GstRTSPTransport * transport)
691 GstRTSPStreamPrivate *priv;
693 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
694 g_return_val_if_fail (transport != NULL, FALSE);
698 g_mutex_lock (&priv->lock);
699 if (transport->trans != GST_RTSP_TRANS_RTP)
700 goto unsupported_transmode;
702 if (!(transport->profile & priv->profiles))
703 goto unsupported_profile;
705 if (!(transport->lower_transport & priv->protocols))
706 goto unsupported_ltrans;
708 g_mutex_unlock (&priv->lock);
713 unsupported_transmode:
715 GST_DEBUG ("unsupported transport mode %d", transport->trans);
716 g_mutex_unlock (&priv->lock);
721 GST_DEBUG ("unsupported profile %d", transport->profile);
722 g_mutex_unlock (&priv->lock);
727 GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
728 g_mutex_unlock (&priv->lock);
734 * gst_rtsp_stream_set_profiles:
735 * @stream: a #GstRTSPStream
736 * @profiles: the new profiles
738 * Configure the allowed profiles for @stream.
741 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
743 GstRTSPStreamPrivate *priv;
745 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
749 g_mutex_lock (&priv->lock);
750 priv->profiles = profiles;
751 g_mutex_unlock (&priv->lock);
755 * gst_rtsp_stream_get_profiles:
756 * @stream: a #GstRTSPStream
758 * Get the allowed profiles of @stream.
760 * Returns: a #GstRTSPProfile
763 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
765 GstRTSPStreamPrivate *priv;
768 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
772 g_mutex_lock (&priv->lock);
773 res = priv->profiles;
774 g_mutex_unlock (&priv->lock);
780 * gst_rtsp_stream_set_protocols:
781 * @stream: a #GstRTSPStream
782 * @protocols: the new flags
784 * Configure the allowed lower transport for @stream.
787 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
788 GstRTSPLowerTrans protocols)
790 GstRTSPStreamPrivate *priv;
792 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
796 g_mutex_lock (&priv->lock);
797 priv->protocols = protocols;
798 g_mutex_unlock (&priv->lock);
802 * gst_rtsp_stream_get_protocols:
803 * @stream: a #GstRTSPStream
805 * Get the allowed protocols of @stream.
807 * Returns: a #GstRTSPLowerTrans
810 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
812 GstRTSPStreamPrivate *priv;
813 GstRTSPLowerTrans res;
815 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
816 GST_RTSP_LOWER_TRANS_UNKNOWN);
820 g_mutex_lock (&priv->lock);
821 res = priv->protocols;
822 g_mutex_unlock (&priv->lock);
828 * gst_rtsp_stream_set_address_pool:
829 * @stream: a #GstRTSPStream
830 * @pool: (transfer none) (nullable): a #GstRTSPAddressPool
832 * configure @pool to be used as the address pool of @stream.
835 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
836 GstRTSPAddressPool * pool)
838 GstRTSPStreamPrivate *priv;
839 GstRTSPAddressPool *old;
841 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
845 GST_LOG_OBJECT (stream, "set address pool %p", pool);
847 g_mutex_lock (&priv->lock);
848 if ((old = priv->pool) != pool)
849 priv->pool = pool ? g_object_ref (pool) : NULL;
852 g_mutex_unlock (&priv->lock);
855 g_object_unref (old);
859 * gst_rtsp_stream_get_address_pool:
860 * @stream: a #GstRTSPStream
862 * Get the #GstRTSPAddressPool used as the address pool of @stream.
864 * Returns: (transfer full) (nullable): the #GstRTSPAddressPool of @stream.
865 * g_object_unref() after usage.
868 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
870 GstRTSPStreamPrivate *priv;
871 GstRTSPAddressPool *result;
873 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
877 g_mutex_lock (&priv->lock);
878 if ((result = priv->pool))
879 g_object_ref (result);
880 g_mutex_unlock (&priv->lock);
886 * gst_rtsp_stream_set_multicast_iface:
887 * @stream: a #GstRTSPStream
888 * @multicast_iface: (transfer none) (nullable): a multicast interface name
890 * configure @multicast_iface to be used for @stream.
893 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
894 const gchar * multicast_iface)
896 GstRTSPStreamPrivate *priv;
899 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
903 GST_LOG_OBJECT (stream, "set multicast iface %s",
904 GST_STR_NULL (multicast_iface));
906 g_mutex_lock (&priv->lock);
907 if ((old = priv->multicast_iface) != multicast_iface)
908 priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
911 g_mutex_unlock (&priv->lock);
918 * gst_rtsp_stream_get_multicast_iface:
919 * @stream: a #GstRTSPStream
921 * Get the multicast interface used for @stream.
923 * Returns: (transfer full) (nullable): the multicast interface for @stream.
924 * g_free() after usage.
927 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
929 GstRTSPStreamPrivate *priv;
932 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
936 g_mutex_lock (&priv->lock);
937 if ((result = priv->multicast_iface))
938 result = g_strdup (result);
939 g_mutex_unlock (&priv->lock);
945 * gst_rtsp_stream_get_multicast_address:
946 * @stream: a #GstRTSPStream
947 * @family: the #GSocketFamily
949 * Get the multicast address of @stream for @family. The original
950 * #GstRTSPAddress is cached and copy is returned, so freeing the return value
951 * won't release the address from the pool.
953 * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
954 * or %NULL when no address could be allocated. gst_rtsp_address_free()
958 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
959 GSocketFamily family)
961 GstRTSPStreamPrivate *priv;
962 GstRTSPAddress *result;
963 GstRTSPAddress **addrp;
964 GstRTSPAddressFlags flags;
966 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
970 g_mutex_lock (&stream->priv->lock);
972 if (family == G_SOCKET_FAMILY_IPV6) {
973 flags = GST_RTSP_ADDRESS_FLAG_IPV6;
974 addrp = &priv->mcast_addr_v6;
976 flags = GST_RTSP_ADDRESS_FLAG_IPV4;
977 addrp = &priv->mcast_addr_v4;
980 if (*addrp == NULL) {
981 if (priv->pool == NULL)
984 flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
986 *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
990 /* FIXME: Also reserve the same port with unicast ANY address, since that's
991 * where we are going to bind our socket. Probably loop until we find a port
992 * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
993 * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
994 * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
996 result = gst_rtsp_address_copy (*addrp);
998 g_mutex_unlock (&stream->priv->lock);
1005 GST_ERROR_OBJECT (stream, "no address pool specified");
1006 g_mutex_unlock (&stream->priv->lock);
1011 GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1012 g_mutex_unlock (&stream->priv->lock);
1018 * gst_rtsp_stream_reserve_address:
1019 * @stream: a #GstRTSPStream
1020 * @address: an address
1025 * Reserve @address and @port as the address and port of @stream. The original
1026 * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1027 * won't release the address from the pool.
1029 * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1030 * the address could be reserved. gst_rtsp_address_free() after usage.
1033 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1034 const gchar * address, guint port, guint n_ports, guint ttl)
1036 GstRTSPStreamPrivate *priv;
1037 GstRTSPAddress *result;
1039 GSocketFamily family;
1040 GstRTSPAddress **addrp;
1042 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1043 g_return_val_if_fail (address != NULL, NULL);
1044 g_return_val_if_fail (port > 0, NULL);
1045 g_return_val_if_fail (n_ports > 0, NULL);
1046 g_return_val_if_fail (ttl > 0, NULL);
1048 priv = stream->priv;
1050 addr = g_inet_address_new_from_string (address);
1052 GST_ERROR ("failed to get inet addr from %s", address);
1053 family = G_SOCKET_FAMILY_IPV4;
1055 family = g_inet_address_get_family (addr);
1056 g_object_unref (addr);
1059 if (family == G_SOCKET_FAMILY_IPV6)
1060 addrp = &priv->mcast_addr_v6;
1062 addrp = &priv->mcast_addr_v4;
1064 g_mutex_lock (&priv->lock);
1065 if (*addrp == NULL) {
1066 GstRTSPAddressPoolResult res;
1068 if (priv->pool == NULL)
1071 res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1072 port, n_ports, ttl, addrp);
1073 if (res != GST_RTSP_ADDRESS_POOL_OK)
1076 /* FIXME: Also reserve the same port with unicast ANY address, since that's
1077 * where we are going to bind our socket. */
1079 if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1080 (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1081 (*addrp)->ttl != ttl)
1082 goto different_address;
1084 result = gst_rtsp_address_copy (*addrp);
1085 g_mutex_unlock (&priv->lock);
1092 GST_ERROR_OBJECT (stream, "no address pool specified");
1093 g_mutex_unlock (&priv->lock);
1098 GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1100 g_mutex_unlock (&priv->lock);
1105 GST_ERROR_OBJECT (stream,
1106 "address %s is not the same as %s that was already reserved",
1107 address, (*addrp)->address);
1108 g_mutex_unlock (&priv->lock);
1113 /* must be called with lock */
1115 set_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1116 GSocketFamily family)
1118 const gchar *multisink_socket;
1120 if (family == G_SOCKET_FAMILY_IPV6)
1121 multisink_socket = "socket-v6";
1123 multisink_socket = "socket";
1125 g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL);
1128 /* must be called with lock */
1130 set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1131 GSocketFamily family, const gchar * multicast_iface,
1132 const gchar * addr_str, gint port, gint mcast_ttl)
1134 set_socket_for_udpsink (udpsink, socket, family);
1136 if (multicast_iface) {
1137 GST_INFO ("setting multicast-iface %s", multicast_iface);
1138 g_object_set (G_OBJECT (udpsink), "multicast-iface", multicast_iface, NULL);
1141 if (mcast_ttl > 0) {
1142 GST_INFO ("setting ttl-mc %d", mcast_ttl);
1143 g_object_set (G_OBJECT (udpsink), "ttl-mc", mcast_ttl, NULL);
1146 g_signal_emit_by_name (udpsink, "add", addr_str, port, NULL);
1150 /* must be called with lock */
1152 set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1153 GSocketFamily family)
1155 set_socket_for_udpsink (udpsink, socket, family);
1159 get_port_from_socket (GSocket * socket)
1162 GSocketAddress *sockaddr;
1165 GST_DEBUG ("socket: %p", socket);
1166 sockaddr = g_socket_get_local_address (socket, &err);
1167 if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) {
1168 g_clear_object (&sockaddr);
1169 GST_ERROR ("failed to get sockaddr: %s", err->message);
1174 port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr));
1175 g_object_unref (sockaddr);
1182 create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink,
1183 GSocket * socket_v4, GSocket * socket_v6, gboolean multicast,
1184 gboolean is_rtp, gint mcast_ttl)
1186 GstRTSPStreamPrivate *priv = stream->priv;
1188 *udpsink = gst_element_factory_make ("multiudpsink", NULL);
1191 goto no_udp_protocol;
1193 /* configure sinks */
1195 g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL);
1197 g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL);
1200 g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL);
1202 g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL);
1204 /* Needs to be async for RECORD streams, otherwise we will never go to
1205 * PLAYING because the sinks will wait for data while the udpsrc can't
1206 * provide data with timestamps in PAUSED. */
1207 if (!is_rtp || priv->sinkpad)
1208 g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL);
1211 /* join multicast group when adding clients, so we'll start receiving from it.
1212 * We cannot rely on the udpsrc to join the group since its socket is always a
1213 * local unicast one. */
1214 g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL);
1216 g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL);
1219 /* update the dscp qos field in the sinks */
1220 update_dscp_qos (stream, udpsink);
1222 if (priv->server_addr_v4) {
1223 GST_DEBUG_OBJECT (stream, "udp IPv4, configure udpsinks");
1224 set_unicast_socket_for_udpsink (*udpsink, socket_v4, G_SOCKET_FAMILY_IPV4);
1227 if (priv->server_addr_v6) {
1228 GST_DEBUG_OBJECT (stream, "udp IPv6, configure udpsinks");
1229 set_unicast_socket_for_udpsink (*udpsink, socket_v6, G_SOCKET_FAMILY_IPV6);
1234 if (priv->mcast_addr_v4) {
1235 GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks");
1236 port = get_port_from_socket (socket_v4);
1238 goto get_port_failed;
1239 set_multicast_socket_for_udpsink (*udpsink, socket_v4,
1240 G_SOCKET_FAMILY_IPV4, priv->multicast_iface,
1241 priv->mcast_addr_v4->address, port, mcast_ttl);
1244 if (priv->mcast_addr_v6) {
1245 GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks");
1246 port = get_port_from_socket (socket_v6);
1248 goto get_port_failed;
1249 set_multicast_socket_for_udpsink (*udpsink, socket_v6,
1250 G_SOCKET_FAMILY_IPV6, priv->multicast_iface,
1251 priv->mcast_addr_v6->address, port, mcast_ttl);
1261 GST_ERROR_OBJECT (stream, "failed to create udpsink element");
1266 GST_ERROR_OBJECT (stream, "failed to get udp port");
1271 /* must be called with lock */
1273 create_and_configure_udpsource (GstElement ** udpsrc, GSocket * socket)
1275 GstStateChangeReturn ret;
1277 g_assert (socket != NULL);
1279 *udpsrc = gst_element_factory_make ("udpsrc", NULL);
1280 if (*udpsrc == NULL)
1283 g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL);
1285 /* The udpsrc cannot do the join because its socket is always a local unicast
1286 * one. The udpsink sharing the same socket will do it for us. */
1287 g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL);
1289 g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL);
1291 g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL);
1293 ret = gst_element_set_state (*udpsrc, GST_STATE_READY);
1294 if (ret == GST_STATE_CHANGE_FAILURE)
1303 gst_element_set_state (*udpsrc, GST_STATE_NULL);
1304 g_clear_object (udpsrc);
1311 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1312 GSocket * socket_out[2], GstRTSPAddress ** server_addr_out,
1313 gboolean multicast, GstRTSPTransport * ct)
1315 GstRTSPStreamPrivate *priv = stream->priv;
1316 GSocket *rtp_socket = NULL;
1317 GSocket *rtcp_socket;
1318 gint tmp_rtp, tmp_rtcp;
1320 GList *rejected_addresses = NULL;
1321 GstRTSPAddress *addr = NULL;
1322 GInetAddress *inetaddr = NULL;
1323 GSocketAddress *rtp_sockaddr = NULL;
1324 GSocketAddress *rtcp_sockaddr = NULL;
1325 GstRTSPAddressPool *pool;
1330 /* Start with random port */
1333 rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1334 G_SOCKET_PROTOCOL_UDP, NULL);
1336 goto no_udp_protocol;
1337 g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1339 /* try to allocate 2 UDP ports, the RTP port should be an even
1340 * number and the RTCP port should be the next (uneven) port */
1343 if (rtp_socket == NULL) {
1344 rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1345 G_SOCKET_PROTOCOL_UDP, NULL);
1347 goto no_udp_protocol;
1348 g_socket_set_multicast_loopback (rtp_socket, FALSE);
1351 if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
1352 GstRTSPAddressFlags flags;
1355 rejected_addresses = g_list_prepend (rejected_addresses, addr);
1360 flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1362 flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1364 flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1366 if (family == G_SOCKET_FAMILY_IPV6)
1367 flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1369 flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1371 addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1376 tmp_rtp = addr->port;
1378 g_clear_object (&inetaddr);
1379 /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and
1380 * socket control message set in udpsrc? */
1382 inetaddr = g_inet_address_new_any (family);
1384 inetaddr = g_inet_address_new_from_string (addr->address);
1392 if (inetaddr == NULL)
1393 inetaddr = g_inet_address_new_any (family);
1396 rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1397 if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1398 GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again");
1399 g_object_unref (rtp_sockaddr);
1402 g_object_unref (rtp_sockaddr);
1404 rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1405 if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1406 g_clear_object (&rtp_sockaddr);
1411 g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1412 g_object_unref (rtp_sockaddr);
1414 /* check if port is even */
1415 if ((tmp_rtp & 1) != 0) {
1416 /* port not even, close and allocate another */
1418 g_clear_object (&rtp_socket);
1423 tmp_rtcp = tmp_rtp + 1;
1425 rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1426 if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1427 GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again");
1428 g_object_unref (rtcp_sockaddr);
1429 g_clear_object (&rtp_socket);
1432 g_object_unref (rtcp_sockaddr);
1435 addr = g_slice_new0 (GstRTSPAddress);
1436 addr->address = g_inet_address_to_string (inetaddr);
1437 addr->port = tmp_rtp;
1441 g_clear_object (&inetaddr);
1443 socket_out[0] = rtp_socket;
1444 socket_out[1] = rtcp_socket;
1445 *server_addr_out = addr;
1447 GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d",
1448 addr->address, tmp_rtp, tmp_rtcp);
1450 g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1457 GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: protocol error");
1462 GST_ERROR_OBJECT (stream,
1463 "failed to allocate UDP ports: no address pool specified");
1468 GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1473 GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no ports");
1478 GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: socket error");
1484 g_object_unref (inetaddr);
1485 g_list_free_full (rejected_addresses,
1486 (GDestroyNotify) gst_rtsp_address_free);
1488 gst_rtsp_address_free (addr);
1490 g_object_unref (rtp_socket);
1492 g_object_unref (rtcp_socket);
1498 * gst_rtsp_stream_allocate_udp_sockets:
1499 * @stream: a #GstRTSPStream
1500 * @family: protocol family
1501 * @transport: transport method
1502 * @use_client_settings: Whether to use client settings or not
1504 * Allocates RTP and RTCP ports.
1506 * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1509 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1510 GSocketFamily family, GstRTSPTransport * ct,
1511 gboolean use_transport_settings)
1513 GstRTSPStreamPrivate *priv;
1514 gboolean ret = FALSE;
1515 GstRTSPLowerTrans transport;
1516 gboolean allocated = FALSE;
1518 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1519 g_return_val_if_fail (ct != NULL, FALSE);
1520 priv = stream->priv;
1522 transport = ct->lower_transport;
1524 g_mutex_lock (&priv->lock);
1526 if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1527 if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_socket_v4[0])
1529 else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_socket_v6[0])
1531 } else if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1532 if (family == G_SOCKET_FAMILY_IPV4 && priv->socket_v4[0])
1534 else if (family == G_SOCKET_FAMILY_IPV6 && priv->socket_v6[0])
1539 GST_DEBUG_OBJECT (stream, "Allocated already");
1540 g_mutex_unlock (&priv->lock);
1544 if (family == G_SOCKET_FAMILY_IPV4) {
1546 if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1548 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4");
1549 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1550 priv->socket_v4, &priv->server_addr_v4, FALSE, ct);
1553 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4");
1554 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1555 priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct);
1559 if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1561 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6");
1562 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1563 priv->socket_v6, &priv->server_addr_v6, FALSE, ct);
1567 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6");
1568 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1569 priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct);
1572 g_mutex_unlock (&priv->lock);
1578 * gst_rtsp_stream_set_client_side:
1579 * @stream: a #GstRTSPStream
1580 * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1581 * an RTSP connection.
1583 * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1584 * streams to an RTSP server via RECORD. This has the practical effect
1585 * of changing which UDP port numbers are used when setting up the local
1586 * side of the stream sending to be either the 'server' or 'client' pair
1587 * of a configured UDP transport.
1590 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1592 GstRTSPStreamPrivate *priv;
1594 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1595 priv = stream->priv;
1596 g_mutex_lock (&priv->lock);
1597 priv->client_side = client_side;
1598 g_mutex_unlock (&priv->lock);
1602 * gst_rtsp_stream_is_client_side:
1603 * @stream: a #GstRTSPStream
1605 * See gst_rtsp_stream_set_client_side()
1607 * Returns: TRUE if this #GstRTSPStream is client-side.
1610 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1612 GstRTSPStreamPrivate *priv;
1615 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1617 priv = stream->priv;
1618 g_mutex_lock (&priv->lock);
1619 ret = priv->client_side;
1620 g_mutex_unlock (&priv->lock);
1626 * gst_rtsp_stream_get_server_port:
1627 * @stream: a #GstRTSPStream
1628 * @server_port: (out): result server port
1629 * @family: the port family to get
1631 * Fill @server_port with the port pair used by the server. This function can
1632 * only be called when @stream has been joined.
1635 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1636 GstRTSPRange * server_port, GSocketFamily family)
1638 GstRTSPStreamPrivate *priv;
1640 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1641 priv = stream->priv;
1642 g_return_if_fail (priv->joined_bin != NULL);
1645 server_port->min = 0;
1646 server_port->max = 0;
1649 g_mutex_lock (&priv->lock);
1650 if (family == G_SOCKET_FAMILY_IPV4) {
1651 if (server_port && priv->server_addr_v4) {
1652 server_port->min = priv->server_addr_v4->port;
1654 priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1657 if (server_port && priv->server_addr_v6) {
1658 server_port->min = priv->server_addr_v6->port;
1660 priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1663 g_mutex_unlock (&priv->lock);
1667 * gst_rtsp_stream_get_rtpsession:
1668 * @stream: a #GstRTSPStream
1670 * Get the RTP session of this stream.
1672 * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1675 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1677 GstRTSPStreamPrivate *priv;
1680 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1682 priv = stream->priv;
1684 g_mutex_lock (&priv->lock);
1685 if ((session = priv->session))
1686 g_object_ref (session);
1687 g_mutex_unlock (&priv->lock);
1693 * gst_rtsp_stream_get_srtp_encoder:
1694 * @stream: a #GstRTSPStream
1696 * Get the SRTP encoder for this stream.
1698 * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1701 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1703 GstRTSPStreamPrivate *priv;
1704 GstElement *encoder;
1706 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1708 priv = stream->priv;
1710 g_mutex_lock (&priv->lock);
1711 if ((encoder = priv->srtpenc))
1712 g_object_ref (encoder);
1713 g_mutex_unlock (&priv->lock);
1719 * gst_rtsp_stream_get_ssrc:
1720 * @stream: a #GstRTSPStream
1721 * @ssrc: (out): result ssrc
1723 * Get the SSRC used by the RTP session of this stream. This function can only
1724 * be called when @stream has been joined.
1727 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1729 GstRTSPStreamPrivate *priv;
1731 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1732 priv = stream->priv;
1733 g_return_if_fail (priv->joined_bin != NULL);
1735 g_mutex_lock (&priv->lock);
1736 if (ssrc && priv->session)
1737 g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1738 g_mutex_unlock (&priv->lock);
1742 * gst_rtsp_stream_set_retransmission_time:
1743 * @stream: a #GstRTSPStream
1744 * @time: a #GstClockTime
1746 * Set the amount of time to store retransmission packets.
1749 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1752 GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1754 g_mutex_lock (&stream->priv->lock);
1755 stream->priv->rtx_time = time;
1756 if (stream->priv->rtxsend)
1757 g_object_set (stream->priv->rtxsend, "max-size-time",
1758 GST_TIME_AS_MSECONDS (time), NULL);
1759 g_mutex_unlock (&stream->priv->lock);
1763 * gst_rtsp_stream_get_retransmission_time:
1764 * @stream: a #GstRTSPStream
1766 * Get the amount of time to store retransmission data.
1768 * Returns: the amount of time to store retransmission data.
1771 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1775 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1777 g_mutex_lock (&stream->priv->lock);
1778 ret = stream->priv->rtx_time;
1779 g_mutex_unlock (&stream->priv->lock);
1785 * gst_rtsp_stream_set_retransmission_pt:
1786 * @stream: a #GstRTSPStream
1789 * Set the payload type (pt) for retransmission of this stream.
1792 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1794 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1796 GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1798 g_mutex_lock (&stream->priv->lock);
1799 stream->priv->rtx_pt = rtx_pt;
1800 if (stream->priv->rtxsend) {
1801 guint pt = gst_rtsp_stream_get_pt (stream);
1802 gchar *pt_s = g_strdup_printf ("%d", pt);
1803 GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1804 pt_s, G_TYPE_UINT, rtx_pt, NULL);
1805 g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1807 gst_structure_free (rtx_pt_map);
1809 g_mutex_unlock (&stream->priv->lock);
1813 * gst_rtsp_stream_get_retransmission_pt:
1814 * @stream: a #GstRTSPStream
1816 * Get the payload-type used for retransmission of this stream
1818 * Returns: The retransmission PT.
1821 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1825 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1827 g_mutex_lock (&stream->priv->lock);
1828 rtx_pt = stream->priv->rtx_pt;
1829 g_mutex_unlock (&stream->priv->lock);
1835 * gst_rtsp_stream_set_buffer_size:
1836 * @stream: a #GstRTSPStream
1837 * @size: the buffer size
1839 * Set the size of the UDP transmission buffer (in bytes)
1840 * Needs to be set before the stream is joined to a bin.
1845 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1847 g_mutex_lock (&stream->priv->lock);
1848 stream->priv->buffer_size = size;
1849 g_mutex_unlock (&stream->priv->lock);
1853 * gst_rtsp_stream_get_buffer_size:
1854 * @stream: a #GstRTSPStream
1856 * Get the size of the UDP transmission buffer (in bytes)
1858 * Returns: the size of the UDP TX buffer
1863 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1867 g_mutex_lock (&stream->priv->lock);
1868 buffer_size = stream->priv->buffer_size;
1869 g_mutex_unlock (&stream->priv->lock);
1874 /* executed from streaming thread */
1876 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1878 GstRTSPStreamPrivate *priv = stream->priv;
1879 GstCaps *newcaps, *oldcaps;
1881 newcaps = gst_pad_get_current_caps (pad);
1883 GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1886 g_mutex_lock (&priv->lock);
1887 oldcaps = priv->caps;
1888 priv->caps = newcaps;
1889 g_mutex_unlock (&priv->lock);
1892 gst_caps_unref (oldcaps);
1896 dump_structure (const GstStructure * s)
1900 sstr = gst_structure_to_string (s);
1901 GST_INFO ("structure: %s", sstr);
1905 static GstRTSPStreamTransport *
1906 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1908 GstRTSPStreamPrivate *priv = stream->priv;
1910 GstRTSPStreamTransport *result = NULL;
1915 if (rtcp_from == NULL)
1918 tmp = g_strrstr (rtcp_from, ":");
1922 port = atoi (tmp + 1);
1923 dest = g_strndup (rtcp_from, tmp - rtcp_from);
1925 g_mutex_lock (&priv->lock);
1926 GST_INFO ("finding %s:%d in %d transports", dest, port,
1927 g_list_length (priv->transports));
1929 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1930 GstRTSPStreamTransport *trans = walk->data;
1931 const GstRTSPTransport *tr;
1934 tr = gst_rtsp_stream_transport_get_transport (trans);
1936 if (priv->client_side) {
1937 /* In client side mode the 'destination' is the RTSP server, so send
1939 min = tr->server_port.min;
1940 max = tr->server_port.max;
1942 min = tr->client_port.min;
1943 max = tr->client_port.max;
1946 if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
1947 (min == port || max == port)) {
1953 g_object_ref (result);
1954 g_mutex_unlock (&priv->lock);
1961 static GstRTSPStreamTransport *
1962 check_transport (GObject * source, GstRTSPStream * stream)
1964 GstStructure *stats;
1965 GstRTSPStreamTransport *trans;
1967 /* see if we have a stream to match with the origin of the RTCP packet */
1968 trans = g_object_get_qdata (source, ssrc_stream_map_key);
1969 if (trans == NULL) {
1970 g_object_get (source, "stats", &stats, NULL);
1972 const gchar *rtcp_from;
1974 dump_structure (stats);
1976 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1977 if ((trans = find_transport (stream, rtcp_from))) {
1978 GST_INFO ("%p: found transport %p for source %p", stream, trans,
1980 g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1983 gst_structure_free (stats);
1991 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1993 GstRTSPStreamTransport *trans;
1995 GST_INFO ("%p: new source %p", stream, source);
1997 trans = check_transport (source, stream);
2000 GST_INFO ("%p: source %p for transport %p", stream, source, trans);
2004 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
2006 GST_INFO ("%p: new SDES %p", stream, source);
2010 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
2012 GstRTSPStreamTransport *trans;
2014 trans = check_transport (source, stream);
2017 GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
2018 gst_rtsp_stream_transport_keep_alive (trans);
2022 GstStructure *stats;
2023 g_object_get (source, "stats", &stats, NULL);
2025 dump_structure (stats);
2026 gst_structure_free (stats);
2033 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2035 GST_INFO ("%p: source %p bye", stream, source);
2039 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2041 GstRTSPStreamTransport *trans;
2043 GST_INFO ("%p: source %p bye timeout", stream, source);
2045 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2046 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2047 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2052 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2054 GstRTSPStreamTransport *trans;
2056 GST_INFO ("%p: source %p timeout", stream, source);
2058 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2059 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2060 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2065 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2067 GST_INFO ("%p: new sender source %p", stream, source);
2070 GstStructure *stats;
2071 g_object_get (source, "stats", &stats, NULL);
2073 dump_structure (stats);
2074 gst_structure_free (stats);
2081 on_sender_ssrc_active (GObject * session, GObject * source,
2082 GstRTSPStream * stream)
2086 GstStructure *stats;
2087 g_object_get (source, "stats", &stats, NULL);
2089 dump_structure (stats);
2090 gst_structure_free (stats);
2097 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2100 g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2101 g_list_free (priv->tr_cache_rtp);
2102 priv->tr_cache_rtp = NULL;
2104 g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2105 g_list_free (priv->tr_cache_rtcp);
2106 priv->tr_cache_rtcp = NULL;
2110 static GstFlowReturn
2111 handle_new_sample (GstAppSink * sink, gpointer user_data)
2113 GstRTSPStreamPrivate *priv;
2117 GstRTSPStream *stream;
2120 sample = gst_app_sink_pull_sample (sink);
2124 stream = (GstRTSPStream *) user_data;
2125 priv = stream->priv;
2126 buffer = gst_sample_get_buffer (sample);
2128 is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2130 g_mutex_lock (&priv->lock);
2132 if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2133 clear_tr_cache (priv, is_rtp);
2134 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2135 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2136 priv->tr_cache_rtp =
2137 g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2139 priv->tr_cache_cookie_rtp = priv->transports_cookie;
2142 if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2143 clear_tr_cache (priv, is_rtp);
2144 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2145 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2146 priv->tr_cache_rtcp =
2147 g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2149 priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2152 g_mutex_unlock (&priv->lock);
2155 for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2156 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2157 gst_rtsp_stream_transport_send_rtp (tr, buffer);
2160 for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2161 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2162 gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2165 gst_sample_unref (sample);
2170 static GstAppSinkCallbacks sink_cb = {
2171 NULL, /* not interested in EOS */
2172 NULL, /* not interested in preroll samples */
2177 get_rtp_encoder (GstRTSPStream * stream, guint session)
2179 GstRTSPStreamPrivate *priv = stream->priv;
2181 if (priv->srtpenc == NULL) {
2184 name = g_strdup_printf ("srtpenc_%u", session);
2185 priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2188 g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2190 return gst_object_ref (priv->srtpenc);
2194 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2196 GstRTSPStreamPrivate *priv = stream->priv;
2197 GstElement *oldenc, *enc;
2201 if (priv->idx != session)
2204 GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2206 oldenc = priv->srtpenc;
2207 enc = get_rtp_encoder (stream, session);
2208 name = g_strdup_printf ("rtp_sink_%d", session);
2209 pad = gst_element_get_request_pad (enc, name);
2211 gst_object_unref (pad);
2214 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2221 request_rtcp_encoder (GstElement * rtpbin, guint session,
2222 GstRTSPStream * stream)
2224 GstRTSPStreamPrivate *priv = stream->priv;
2225 GstElement *oldenc, *enc;
2229 if (priv->idx != session)
2232 GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2234 oldenc = priv->srtpenc;
2235 enc = get_rtp_encoder (stream, session);
2236 name = g_strdup_printf ("rtcp_sink_%d", session);
2237 pad = gst_element_get_request_pad (enc, name);
2239 gst_object_unref (pad);
2242 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2249 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2251 GstRTSPStreamPrivate *priv = stream->priv;
2254 GST_DEBUG ("request key %08x", ssrc);
2256 g_mutex_lock (&priv->lock);
2257 if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2258 gst_caps_ref (caps);
2259 g_mutex_unlock (&priv->lock);
2265 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2266 GstRTSPStream * stream)
2268 GstRTSPStreamPrivate *priv = stream->priv;
2270 if (priv->idx != session)
2273 if (priv->srtpdec == NULL) {
2276 name = g_strdup_printf ("srtpdec_%u", session);
2277 priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2280 g_signal_connect (priv->srtpdec, "request-key",
2281 (GCallback) request_key, stream);
2283 return gst_object_ref (priv->srtpdec);
2287 * gst_rtsp_stream_request_aux_sender:
2288 * @stream: a #GstRTSPStream
2289 * @sessid: the session id
2291 * Creating a rtxsend bin
2293 * Returns: (transfer full) (nullable): a #GstElement.
2298 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2302 GstStructure *pt_map;
2307 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2309 pt = gst_rtsp_stream_get_pt (stream);
2310 pt_s = g_strdup_printf ("%u", pt);
2311 rtx_pt = stream->priv->rtx_pt;
2313 GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2315 bin = gst_bin_new (NULL);
2316 stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2317 pt_map = gst_structure_new ("application/x-rtp-pt-map",
2318 pt_s, G_TYPE_UINT, rtx_pt, NULL);
2319 g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2320 "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2322 gst_structure_free (pt_map);
2323 gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2325 pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2326 name = g_strdup_printf ("src_%u", sessid);
2327 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2329 gst_object_unref (pad);
2331 pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2332 name = g_strdup_printf ("sink_%u", sessid);
2333 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2335 gst_object_unref (pad);
2341 add_rtx_pt (gpointer key, GstCaps * caps, GstStructure * pt_map)
2343 guint pt = GPOINTER_TO_INT (key);
2344 const GstStructure *s = gst_caps_get_structure (caps, 0);
2347 if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "RTX") &&
2348 (apt = gst_structure_get_string (s, "apt"))) {
2349 gst_structure_set (pt_map, apt, G_TYPE_UINT, pt, NULL);
2353 /* Call with priv->lock taken */
2355 update_rtx_receive_pt_map (GstRTSPStream * stream)
2357 GstStructure *pt_map;
2359 if (!stream->priv->rtxreceive)
2362 pt_map = gst_structure_new_empty ("application/x-rtp-pt-map");
2363 g_hash_table_foreach (stream->priv->ptmap, (GHFunc) add_rtx_pt, pt_map);
2364 g_object_set (stream->priv->rtxreceive, "payload-type-map", pt_map, NULL);
2365 gst_structure_free (pt_map);
2372 retrieve_ulpfec_pt (gpointer key, GstCaps * caps, GstElement * ulpfec_decoder)
2374 guint pt = GPOINTER_TO_INT (key);
2375 const GstStructure *s = gst_caps_get_structure (caps, 0);
2377 if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
2378 g_object_set (ulpfec_decoder, "pt", pt, NULL);
2382 update_ulpfec_decoder_pt (GstRTSPStream * stream)
2384 if (!stream->priv->ulpfec_decoder)
2387 g_hash_table_foreach (stream->priv->ptmap, (GHFunc) retrieve_ulpfec_pt,
2388 stream->priv->ulpfec_decoder);
2395 * gst_rtsp_stream_request_aux_receiver:
2396 * @stream: a #GstRTSPStream
2397 * @sessid: the session id
2399 * Creating a rtxreceive bin
2401 * Returns: (transfer full) (nullable): a #GstElement.
2406 gst_rtsp_stream_request_aux_receiver (GstRTSPStream * stream, guint sessid)
2412 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2414 bin = gst_bin_new (NULL);
2415 stream->priv->rtxreceive = gst_element_factory_make ("rtprtxreceive", NULL);
2416 update_rtx_receive_pt_map (stream);
2417 update_ulpfec_decoder_pt (stream);
2418 gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxreceive));
2420 pad = gst_element_get_static_pad (stream->priv->rtxreceive, "src");
2421 name = g_strdup_printf ("src_%u", sessid);
2422 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2424 gst_object_unref (pad);
2426 pad = gst_element_get_static_pad (stream->priv->rtxreceive, "sink");
2427 name = g_strdup_printf ("sink_%u", sessid);
2428 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2430 gst_object_unref (pad);
2436 * gst_rtsp_stream_set_pt_map:
2437 * @stream: a #GstRTSPStream
2441 * Configure a pt map between @pt and @caps.
2444 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2446 GstRTSPStreamPrivate *priv = stream->priv;
2448 if (!GST_IS_CAPS (caps))
2451 g_mutex_lock (&priv->lock);
2452 g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2453 update_rtx_receive_pt_map (stream);
2454 g_mutex_unlock (&priv->lock);
2458 * gst_rtsp_stream_set_publish_clock_mode:
2459 * @stream: a #GstRTSPStream
2460 * @mode: the clock publish mode
2462 * Sets if and how the stream clock should be published according to RFC7273.
2467 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2468 GstRTSPPublishClockMode mode)
2470 GstRTSPStreamPrivate *priv;
2472 priv = stream->priv;
2473 g_mutex_lock (&priv->lock);
2474 priv->publish_clock_mode = mode;
2475 g_mutex_unlock (&priv->lock);
2479 * gst_rtsp_stream_get_publish_clock_mode:
2480 * @stream: a #GstRTSPStream
2482 * Gets if and how the stream clock should be published according to RFC7273.
2484 * Returns: The GstRTSPPublishClockMode
2488 GstRTSPPublishClockMode
2489 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2491 GstRTSPStreamPrivate *priv;
2492 GstRTSPPublishClockMode ret;
2494 priv = stream->priv;
2495 g_mutex_lock (&priv->lock);
2496 ret = priv->publish_clock_mode;
2497 g_mutex_unlock (&priv->lock);
2503 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2504 GstRTSPStream * stream)
2506 GstRTSPStreamPrivate *priv = stream->priv;
2507 GstCaps *caps = NULL;
2509 g_mutex_lock (&priv->lock);
2511 if (priv->idx == session) {
2512 caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2514 GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2515 gst_caps_ref (caps);
2517 GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2521 g_mutex_unlock (&priv->lock);
2527 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2529 GstRTSPStreamPrivate *priv = stream->priv;
2531 GstPadLinkReturn ret;
2534 GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2535 GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2537 name = gst_pad_get_name (pad);
2538 if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2544 if (priv->idx != sessid)
2547 if (gst_pad_is_linked (priv->sinkpad)) {
2548 GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2549 GST_DEBUG_PAD_NAME (priv->sinkpad));
2553 /* link the RTP pad to the session manager, it should not really fail unless
2554 * this is not really an RTP pad */
2555 ret = gst_pad_link (pad, priv->sinkpad);
2556 if (ret != GST_PAD_LINK_OK)
2558 priv->recv_rtp_src = gst_object_ref (pad);
2565 GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2566 GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2571 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2572 GstRTSPStream * stream)
2574 /* TODO: What to do here other than this? */
2575 GST_DEBUG ("Stream %p: Got EOS", stream);
2576 gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2579 typedef struct _ProbeData ProbeData;
2583 GstRTSPStream *stream;
2584 /* existing sink, already linked to tee */
2586 /* new sink, about to be linked */
2588 /* new queue element, that will be linked to tee and sink1 */
2589 GstElement **queue1;
2590 /* new queue element, that will be linked to tee and sink2 */
2591 GstElement **queue2;
2598 free_cb_data (gpointer user_data)
2600 ProbeData *data = user_data;
2602 gst_object_unref (data->stream);
2603 gst_object_unref (data->sink1);
2604 gst_object_unref (data->sink2);
2605 gst_object_unref (data->sink_pad);
2606 gst_object_unref (data->tee_pad);
2612 create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream,
2613 GstElement * tee, GstElement * sink, GstElement ** queue)
2615 GstRTSPStreamPrivate *priv = stream->priv;
2620 /* create queue for the new stream */
2621 *queue = gst_element_factory_make ("queue", NULL);
2622 g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0,
2623 "max-size-time", G_GINT64_CONSTANT (0), NULL);
2624 gst_bin_add (priv->joined_bin, *queue);
2626 /* link tee to queue */
2627 tee_pad = gst_element_get_request_pad (tee, "src_%u");
2628 queue_pad = gst_element_get_static_pad (*queue, "sink");
2629 gst_pad_link (tee_pad, queue_pad);
2630 gst_object_unref (queue_pad);
2631 gst_object_unref (tee_pad);
2633 /* link queue to sink */
2634 queue_pad = gst_element_get_static_pad (*queue, "src");
2635 sink_pad = gst_element_get_static_pad (sink, "sink");
2636 gst_pad_link (queue_pad, sink_pad);
2637 gst_object_unref (queue_pad);
2638 gst_object_unref (sink_pad);
2640 gst_element_sync_state_with_parent (sink);
2641 gst_element_sync_state_with_parent (*queue);
2644 static GstPadProbeReturn
2645 create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad,
2646 GstPadProbeInfo * info, gpointer user_data)
2648 GstRTSPStreamPrivate *priv;
2649 ProbeData *data = user_data;
2650 GstRTSPStream *stream;
2651 GstElement **queue1;
2652 GstElement **queue2;
2658 stream = data->stream;
2659 priv = stream->priv;
2660 queue1 = data->queue1;
2661 queue2 = data->queue2;
2662 sink_pad = data->sink_pad;
2663 tee_pad = data->tee_pad;
2664 index = data->index;
2666 /* unlink tee and the existing sink:
2667 * .-----. .---------.
2670 * '-----' '---------'
2672 g_assert (gst_pad_unlink (tee_pad, sink_pad));
2674 /* add queue to the already existing stream */
2675 *queue1 = gst_element_factory_make ("queue", NULL);
2676 g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0,
2677 "max-size-time", G_GINT64_CONSTANT (0), NULL);
2678 gst_bin_add (priv->joined_bin, *queue1);
2680 /* link tee, queue and sink:
2681 * .-----. .---------. .---------.
2682 * | tee | | queue1 | | sink1 |
2683 * sink src->sink src->sink |
2684 * '-----' '---------' '---------'
2686 queue_pad = gst_element_get_static_pad (*queue1, "sink");
2687 gst_pad_link (tee_pad, queue_pad);
2688 gst_object_unref (queue_pad);
2689 queue_pad = gst_element_get_static_pad (*queue1, "src");
2690 gst_pad_link (queue_pad, sink_pad);
2691 gst_object_unref (queue_pad);
2693 gst_element_sync_state_with_parent (*queue1);
2695 /* create queue and link it to tee and the new sink */
2696 create_and_plug_queue_to_unlinked_stream (stream,
2697 priv->tee[index], data->sink2, queue2);
2699 /* the final stream:
2701 * .-----. .---------. .---------.
2702 * | tee | | queue1 | | sink1 |
2703 * sink src->sink src->sink |
2704 * | | '---------' '---------'
2705 * | | .---------. .---------.
2706 * | | | queue2 | | sink2 |
2707 * | src->sink src->sink |
2708 * '-----' '---------' '---------'
2711 return GST_PAD_PROBE_REMOVE;
2715 create_and_plug_queue_to_linked_stream (GstRTSPStream * stream,
2716 GstElement * sink1, GstElement * sink2, guint index, GstElement ** queue1,
2717 GstElement ** queue2)
2721 data = g_new0 (ProbeData, 1);
2722 data->stream = gst_object_ref (stream);
2723 data->sink1 = gst_object_ref (sink1);
2724 data->sink2 = gst_object_ref (sink2);
2725 data->queue1 = queue1;
2726 data->queue2 = queue2;
2727 data->index = index;
2729 data->sink_pad = gst_element_get_static_pad (sink1, "sink");
2730 g_assert (data->sink_pad);
2731 data->tee_pad = gst_pad_get_peer (data->sink_pad);
2732 g_assert (data->tee_pad);
2734 gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE,
2735 create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data);
2739 plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug,
2740 GstElement ** queue_to_plug, guint index, gboolean is_mcast)
2742 GstRTSPStreamPrivate *priv = stream->priv;
2743 GstElement *existing_sink;
2746 existing_sink = priv->udpsink[index];
2748 existing_sink = priv->mcast_udpsink[index];
2750 GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp");
2752 /* add sink to the bin */
2753 gst_bin_add (priv->joined_bin, sink_to_plug);
2755 if (priv->appsink[index] && existing_sink) {
2757 /* queues are already added for the existing stream, add one for
2758 the newly added udp stream */
2759 create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index],
2760 sink_to_plug, queue_to_plug);
2762 } else if (priv->appsink[index] || existing_sink) {
2764 GstElement *element;
2766 /* add queue to the already existing stream plus the newly created udp
2768 if (priv->appsink[index]) {
2769 element = priv->appsink[index];
2770 queue = &priv->appqueue[index];
2772 element = existing_sink;
2774 queue = &priv->udpqueue[index];
2776 queue = &priv->mcast_udpqueue[index];
2779 create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug,
2780 index, queue, queue_to_plug);
2786 GST_DEBUG_OBJECT (stream, "creating first stream");
2788 /* no need to add queues */
2789 tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2790 sink_pad = gst_element_get_static_pad (sink_to_plug, "sink");
2791 gst_pad_link (tee_pad, sink_pad);
2792 gst_object_unref (tee_pad);
2793 gst_object_unref (sink_pad);
2796 gst_element_sync_state_with_parent (sink_to_plug);
2800 plug_tcp_sink (GstRTSPStream * stream, guint index)
2802 GstRTSPStreamPrivate *priv = stream->priv;
2804 GST_DEBUG_OBJECT (stream, "plug tcp sink");
2806 /* add sink to the bin */
2807 gst_bin_add (priv->joined_bin, priv->appsink[index]);
2809 if (priv->mcast_udpsink[index] && priv->udpsink[index]) {
2811 /* queues are already added for the existing stream, add one for
2812 the newly added tcp stream */
2813 create_and_plug_queue_to_unlinked_stream (stream,
2814 priv->tee[index], priv->appsink[index], &priv->appqueue[index]);
2816 } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) {
2818 GstElement *element;
2820 /* add queue to the already existing stream plus the newly created tcp
2822 if (priv->mcast_udpsink[index]) {
2823 element = priv->mcast_udpsink[index];
2824 queue = &priv->mcast_udpqueue[index];
2826 element = priv->udpsink[index];
2827 queue = &priv->udpqueue[index];
2830 create_and_plug_queue_to_linked_stream (stream, element,
2831 priv->appsink[index], index, queue, &priv->appqueue[index]);
2837 /* no need to add queues */
2838 tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2839 sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink");
2840 gst_pad_link (tee_pad, sink_pad);
2841 gst_object_unref (tee_pad);
2842 gst_object_unref (sink_pad);
2845 gst_element_sync_state_with_parent (priv->appsink[index]);
2849 plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport,
2852 GstRTSPStreamPrivate *priv;
2853 gboolean is_tcp, is_udp, is_mcast;
2854 priv = stream->priv;
2856 is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2857 is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2858 is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2861 plug_udp_sink (stream, priv->udpsink[index],
2862 &priv->udpqueue[index], index, FALSE);
2865 plug_udp_sink (stream, priv->mcast_udpsink[index],
2866 &priv->mcast_udpqueue[index], index, TRUE);
2869 plug_tcp_sink (stream, index);
2872 /* must be called with lock */
2874 create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
2876 GstRTSPStreamPrivate *priv;
2879 gboolean is_tcp, is_udp, is_mcast;
2883 GST_DEBUG_OBJECT (stream, "create sender part");
2884 priv = stream->priv;
2885 bin = priv->joined_bin;
2887 is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2888 is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2889 is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2892 mcast_ttl = transport->ttl;
2894 GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d (ttl: %d)", is_tcp,
2895 is_udp, is_mcast, mcast_ttl);
2897 if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) {
2898 GST_WARNING_OBJECT (stream, "no sockets assigned for UDP");
2902 if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) {
2903 GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast");
2907 for (i = 0; i < 2; i++) {
2908 gboolean link_tee = FALSE;
2909 /* For the sender we create this bit of pipeline for both
2911 * Initially there will be only one active transport for
2912 * the stream, so the pipeline will look like this:
2914 * .--------. .-----. .---------.
2915 * | rtpbin | | tee | | sink |
2916 * | send->sink src->sink |
2917 * '--------' '-----' '---------'
2919 * For each new transport, the already existing branch will
2920 * be reconfigured by adding a queue element:
2922 * .--------. .-----. .---------. .---------.
2923 * | rtpbin | | tee | | queue | | udpsink |
2924 * | send->sink src->sink src->sink |
2925 * '--------' | | '---------' '---------'
2926 * | | .---------. .---------.
2927 * | | | queue | | udpsink |
2928 * | src->sink src->sink |
2929 * | | '---------' '---------'
2930 * | | .---------. .---------.
2931 * | | | queue | | appsink |
2932 * | src->sink src->sink |
2933 * '-----' '---------' '---------'
2936 /* Only link the RTP send src if we're going to send RTP, link
2937 * the RTCP send src always */
2938 if (!priv->srcpad && i == 0)
2941 if (!priv->tee[i]) {
2942 /* make tee for RTP/RTCP */
2943 priv->tee[i] = gst_element_factory_make ("tee", NULL);
2944 gst_bin_add (bin, priv->tee[i]);
2948 if (is_udp && !priv->udpsink[i]) {
2949 /* we create only one pair of udpsinks for IPv4 and IPv6 */
2950 create_and_configure_udpsink (stream, &priv->udpsink[i],
2951 priv->socket_v4[i], priv->socket_v6[i], FALSE, (i == 0), mcast_ttl);
2952 plug_sink (stream, transport, i);
2953 } else if (is_mcast && !priv->mcast_udpsink[i]) {
2954 /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */
2955 create_and_configure_udpsink (stream, &priv->mcast_udpsink[i],
2956 priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0),
2958 plug_sink (stream, transport, i);
2959 } else if (is_tcp && !priv->appsink[i]) {
2961 priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2962 g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2964 /* we need to set sync and preroll to FALSE for the sink to avoid
2965 * deadlock. This is only needed for sink sending RTCP data. */
2967 g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2969 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2970 &sink_cb, stream, NULL);
2971 plug_sink (stream, transport, i);
2975 /* and link to rtpbin send pad */
2976 gst_element_sync_state_with_parent (priv->tee[i]);
2977 pad = gst_element_get_static_pad (priv->tee[i], "sink");
2978 gst_pad_link (priv->send_src[i], pad);
2979 gst_object_unref (pad);
2986 /* must be called with lock */
2988 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2989 GstElement * funnel)
2991 GstRTSPStreamPrivate *priv;
2992 GstPad *pad, *selpad;
2994 priv = stream->priv;
2997 /* we set and keep these to playing so that they don't cause NO_PREROLL return
2998 * values. This is only relevant for PLAY pipelines */
2999 gst_element_set_state (src, GST_STATE_PLAYING);
3000 gst_element_set_locked_state (src, TRUE);
3004 gst_bin_add (bin, src);
3006 /* and link to the funnel */
3007 selpad = gst_element_get_request_pad (funnel, "sink_%u");
3008 pad = gst_element_get_static_pad (src, "src");
3009 gst_pad_link (pad, selpad);
3010 gst_object_unref (pad);
3011 gst_object_unref (selpad);
3014 /* must be called with lock */
3016 create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport *
3019 GstRTSPStreamPrivate *priv;
3027 GST_DEBUG_OBJECT (stream, "create receiver part");
3028 priv = stream->priv;
3029 bin = priv->joined_bin;
3031 tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3032 udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3033 mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3035 for (i = 0; i < 2; i++) {
3036 /* For the receiver we create this bit of pipeline for both
3037 * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
3038 * and it is all funneled into the rtpbin receive pad.
3041 * .--------. .--------. .--------.
3042 * | udpsrc | | funnel | | rtpbin |
3043 * | RTP src->sink src->sink |
3044 * '--------' | | | |
3045 * .--------. | | | |
3046 * | appsrc | | | | |
3047 * | RTP src->sink | | |
3048 * '--------' '--------' | |
3050 * .--------. .--------. | |
3051 * | udpsrc | | funnel | | |
3052 * | RTCP src->sink src->sink |
3053 * '--------' | | '--------'
3056 * | RTCP src->sink |
3057 * '--------' '--------'
3060 if (!priv->sinkpad && i == 0) {
3061 /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
3062 * RTCP sink always */
3066 /* make funnel for the RTP/RTCP receivers */
3067 if (!priv->funnel[i]) {
3068 priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
3069 gst_bin_add (bin, priv->funnel[i]);
3071 pad = gst_element_get_static_pad (priv->funnel[i], "src");
3072 gst_pad_link (pad, priv->recv_sink[i]);
3073 gst_object_unref (pad);
3076 if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) {
3077 GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources");
3078 if (!create_and_configure_udpsource (&priv->udpsrc_v4[i],
3079 priv->socket_v4[i]))
3082 plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
3085 if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) {
3086 GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources");
3087 if (!create_and_configure_udpsource (&priv->udpsrc_v6[i],
3088 priv->socket_v6[i]))
3091 plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
3094 if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) {
3095 GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources");
3096 if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i],
3097 priv->mcast_socket_v4[i]))
3098 goto mcast_udpsrc_error;
3099 plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
3102 if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) {
3103 GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources");
3104 if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i],
3105 priv->mcast_socket_v6[i]))
3106 goto mcast_udpsrc_error;
3107 plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
3110 if (tcp && !priv->appsrc[i]) {
3111 /* make and add appsrc */
3112 priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
3113 priv->appsrc_base_time[i] = -1;
3114 g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
3116 plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
3119 gst_element_sync_state_with_parent (priv->funnel[i]);
3130 check_mcast_part_for_transport (GstRTSPStream * stream,
3131 const GstRTSPTransport * tr)
3133 GstRTSPStreamPrivate *priv = stream->priv;
3134 GInetAddress *inetaddr;
3135 GSocketFamily family;
3136 GstRTSPAddress *mcast_addr;
3138 /* Check if it's a ipv4 or ipv6 transport */
3139 inetaddr = g_inet_address_new_from_string (tr->destination);
3140 family = g_inet_address_get_family (inetaddr);
3141 g_object_unref (inetaddr);
3143 /* Select fields corresponding to the family */
3144 if (family == G_SOCKET_FAMILY_IPV4) {
3145 mcast_addr = priv->mcast_addr_v4;
3147 mcast_addr = priv->mcast_addr_v6;
3150 /* We support only one mcast group per family, make sure this transport
3155 if (g_ascii_strcasecmp (tr->destination, mcast_addr->address) != 0 ||
3156 tr->port.min != mcast_addr->port ||
3157 tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
3158 tr->ttl != mcast_addr->ttl)
3165 GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
3166 "has been reserved");
3171 GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
3172 "the reserved address");
3178 * gst_rtsp_stream_join_bin:
3179 * @stream: a #GstRTSPStream
3180 * @bin: (transfer none): a #GstBin to join
3181 * @rtpbin: (transfer none): a rtpbin element in @bin
3182 * @state: the target state of the new elements
3184 * Join the #GstBin @bin that contains the element @rtpbin.
3186 * @stream will link to @rtpbin, which must be inside @bin. The elements
3187 * added to @bin will be set to the state given in @state.
3189 * Returns: %TRUE on success.
3192 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
3193 GstElement * rtpbin, GstState state)
3195 GstRTSPStreamPrivate *priv;
3198 GstPadLinkReturn ret;
3200 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3201 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3202 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3204 priv = stream->priv;
3206 g_mutex_lock (&priv->lock);
3207 if (priv->joined_bin != NULL)
3210 /* create a session with the same index as the stream */
3213 GST_INFO ("stream %p joining bin as session %u", stream, idx);
3215 if (priv->profiles & GST_RTSP_PROFILE_SAVP
3216 || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
3218 g_signal_connect (rtpbin, "request-rtp-encoder",
3219 (GCallback) request_rtp_encoder, stream);
3220 g_signal_connect (rtpbin, "request-rtcp-encoder",
3221 (GCallback) request_rtcp_encoder, stream);
3222 g_signal_connect (rtpbin, "request-rtp-decoder",
3223 (GCallback) request_rtp_rtcp_decoder, stream);
3224 g_signal_connect (rtpbin, "request-rtcp-decoder",
3225 (GCallback) request_rtp_rtcp_decoder, stream);
3228 if (priv->sinkpad) {
3229 g_signal_connect (rtpbin, "request-pt-map",
3230 (GCallback) request_pt_map, stream);
3233 /* get pads from the RTP session element for sending and receiving
3236 /* get a pad for sending RTP */
3237 name = g_strdup_printf ("send_rtp_sink_%u", idx);
3238 priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
3241 /* link the RTP pad to the session manager, it should not really fail unless
3242 * this is not really an RTP pad */
3243 ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
3244 if (ret != GST_PAD_LINK_OK)
3247 name = g_strdup_printf ("send_rtp_src_%u", idx);
3248 priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
3251 /* RECORD case: need to connect our sinkpad from here */
3252 g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
3254 g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
3256 name = g_strdup_printf ("recv_rtp_sink_%u", idx);
3257 priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
3261 name = g_strdup_printf ("send_rtcp_src_%u", idx);
3262 priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
3264 name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
3265 priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
3268 /* get the session */
3269 g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
3271 g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
3273 g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
3275 g_signal_connect (priv->session, "on-ssrc-active",
3276 (GCallback) on_ssrc_active, stream);
3277 g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
3279 g_signal_connect (priv->session, "on-bye-timeout",
3280 (GCallback) on_bye_timeout, stream);
3281 g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
3284 /* signal for sender ssrc */
3285 g_signal_connect (priv->session, "on-new-sender-ssrc",
3286 (GCallback) on_new_sender_ssrc, stream);
3287 g_signal_connect (priv->session, "on-sender-ssrc-active",
3288 (GCallback) on_sender_ssrc_active, stream);
3291 /* be notified of caps changes */
3292 priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
3293 (GCallback) caps_notify, stream);
3294 priv->caps = gst_pad_get_current_caps (priv->send_src[0]);
3297 priv->joined_bin = bin;
3298 GST_DEBUG_OBJECT (stream, "successfully joined bin");
3299 g_mutex_unlock (&priv->lock);
3306 g_mutex_unlock (&priv->lock);
3311 GST_WARNING ("failed to link stream %u", idx);
3312 gst_object_unref (priv->send_rtp_sink);
3313 priv->send_rtp_sink = NULL;
3314 g_mutex_unlock (&priv->lock);
3320 clear_element (GstBin * bin, GstElement ** elementptr)
3323 gst_element_set_locked_state (*elementptr, FALSE);
3324 gst_element_set_state (*elementptr, GST_STATE_NULL);
3325 if (GST_ELEMENT_PARENT (*elementptr))
3326 gst_bin_remove (bin, *elementptr);
3328 gst_object_unref (*elementptr);
3334 * gst_rtsp_stream_leave_bin:
3335 * @stream: a #GstRTSPStream
3336 * @bin: (transfer none): a #GstBin
3337 * @rtpbin: (transfer none): a rtpbin #GstElement
3339 * Remove the elements of @stream from @bin.
3341 * Return: %TRUE on success.
3344 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
3345 GstElement * rtpbin)
3347 GstRTSPStreamPrivate *priv;
3350 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3351 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3352 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3354 priv = stream->priv;
3356 g_mutex_lock (&priv->lock);
3357 if (priv->joined_bin == NULL)
3358 goto was_not_joined;
3359 if (priv->joined_bin != bin)
3362 priv->joined_bin = NULL;
3364 /* all transports must be removed by now */
3365 if (priv->transports != NULL)
3366 goto transports_not_removed;
3368 clear_tr_cache (priv, TRUE);
3369 clear_tr_cache (priv, FALSE);
3371 GST_INFO ("stream %p leaving bin", stream);
3374 gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
3376 g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
3377 gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
3378 gst_object_unref (priv->send_rtp_sink);
3379 priv->send_rtp_sink = NULL;
3380 } else if (priv->recv_rtp_src) {
3381 gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
3382 gst_object_unref (priv->recv_rtp_src);
3383 priv->recv_rtp_src = NULL;
3386 for (i = 0; i < 2; i++) {
3387 clear_element (bin, &priv->udpsrc_v4[i]);
3388 clear_element (bin, &priv->udpsrc_v6[i]);
3389 clear_element (bin, &priv->udpqueue[i]);
3390 clear_element (bin, &priv->udpsink[i]);
3392 clear_element (bin, &priv->mcast_udpsrc_v4[i]);
3393 clear_element (bin, &priv->mcast_udpsrc_v6[i]);
3394 clear_element (bin, &priv->mcast_udpqueue[i]);
3395 clear_element (bin, &priv->mcast_udpsink[i]);
3397 clear_element (bin, &priv->appsrc[i]);
3398 clear_element (bin, &priv->appqueue[i]);
3399 clear_element (bin, &priv->appsink[i]);
3401 clear_element (bin, &priv->tee[i]);
3402 clear_element (bin, &priv->funnel[i]);
3404 if (priv->sinkpad || i == 1) {
3405 gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3406 gst_object_unref (priv->recv_sink[i]);
3407 priv->recv_sink[i] = NULL;
3412 gst_object_unref (priv->send_src[0]);
3413 priv->send_src[0] = NULL;
3416 gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3417 gst_object_unref (priv->send_src[1]);
3418 priv->send_src[1] = NULL;
3420 g_object_unref (priv->session);
3421 priv->session = NULL;
3423 gst_caps_unref (priv->caps);
3427 gst_object_unref (priv->srtpenc);
3429 gst_object_unref (priv->srtpdec);
3431 if (priv->mcast_addr_v4)
3432 gst_rtsp_address_free (priv->mcast_addr_v4);
3433 priv->mcast_addr_v4 = NULL;
3434 if (priv->mcast_addr_v6)
3435 gst_rtsp_address_free (priv->mcast_addr_v6);
3436 priv->mcast_addr_v6 = NULL;
3437 if (priv->server_addr_v4)
3438 gst_rtsp_address_free (priv->server_addr_v4);
3439 priv->server_addr_v4 = NULL;
3440 if (priv->server_addr_v6)
3441 gst_rtsp_address_free (priv->server_addr_v6);
3442 priv->server_addr_v6 = NULL;
3444 g_mutex_unlock (&priv->lock);
3450 g_mutex_unlock (&priv->lock);
3453 transports_not_removed:
3455 GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3456 g_mutex_unlock (&priv->lock);
3461 GST_ERROR_OBJECT (stream, "leaving the wrong bin");
3462 g_mutex_unlock (&priv->lock);
3468 * gst_rtsp_stream_get_joined_bin:
3469 * @stream: a #GstRTSPStream
3471 * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3473 * Return: (transfer full) (nullable): the joined bin or NULL.
3476 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3478 GstRTSPStreamPrivate *priv;
3481 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3483 priv = stream->priv;
3485 g_mutex_lock (&priv->lock);
3486 bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3487 g_mutex_unlock (&priv->lock);
3493 * gst_rtsp_stream_get_rtpinfo:
3494 * @stream: a #GstRTSPStream
3495 * @rtptime: (allow-none) (out caller-allocates): result RTP timestamp
3496 * @seq: (allow-none) (out caller-allocates): result RTP seqnum
3497 * @clock_rate: (allow-none) (out caller-allocates): the clock rate
3498 * @running_time: (out caller-allocates): result running-time
3500 * Retrieve the current rtptime, seq and running-time. This is used to
3501 * construct a RTPInfo reply header.
3503 * Returns: %TRUE when rtptime, seq and running-time could be determined.
3506 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3507 guint * rtptime, guint * seq, guint * clock_rate,
3508 GstClockTime * running_time)
3510 GstRTSPStreamPrivate *priv;
3511 GstStructure *stats;
3512 GObjectClass *payobjclass;
3514 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3516 priv = stream->priv;
3518 payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3520 g_mutex_lock (&priv->lock);
3522 /* First try to extract the information from the last buffer on the sinks.
3523 * This will have a more accurate sequence number and timestamp, as between
3524 * the payloader and the sink there can be some queues
3526 if (priv->udpsink[0] || priv->appsink[0]) {
3527 GstSample *last_sample;
3529 if (priv->udpsink[0])
3530 g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3532 g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3537 GstSegment *segment;
3539 GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3541 caps = gst_sample_get_caps (last_sample);
3542 buffer = gst_sample_get_buffer (last_sample);
3543 segment = gst_sample_get_segment (last_sample);
3544 s = gst_caps_get_structure (caps, 0);
3546 if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3547 guint ssrc_buf = gst_rtp_buffer_get_ssrc (&rtp_buffer);
3548 guint ssrc_stream = 0;
3549 if (gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) &&
3550 gst_structure_get_uint (s, "ssrc", &ssrc_stream) &&
3551 ssrc_buf != ssrc_stream) {
3552 /* Skip buffers from auxiliary streams. */
3553 GST_DEBUG_OBJECT (stream,
3554 "not a buffer from the payloader, SSRC: %08x", ssrc_buf);
3556 gst_rtp_buffer_unmap (&rtp_buffer);
3557 gst_sample_unref (last_sample);
3562 *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3566 *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3569 gst_rtp_buffer_unmap (&rtp_buffer);
3573 gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3574 GST_BUFFER_TIMESTAMP (buffer));
3578 gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3580 if (*clock_rate == 0 && running_time)
3581 *running_time = GST_CLOCK_TIME_NONE;
3583 gst_sample_unref (last_sample);
3587 gst_sample_unref (last_sample);
3593 if (g_object_class_find_property (payobjclass, "stats")) {
3594 g_object_get (priv->payloader, "stats", &stats, NULL);
3599 gst_structure_get_uint (stats, "seqnum", seq);
3602 gst_structure_get_uint (stats, "timestamp", rtptime);
3605 gst_structure_get_clock_time (stats, "running-time", running_time);
3608 gst_structure_get_uint (stats, "clock-rate", clock_rate);
3609 if (*clock_rate == 0 && running_time)
3610 *running_time = GST_CLOCK_TIME_NONE;
3612 gst_structure_free (stats);
3614 if (!g_object_class_find_property (payobjclass, "seqnum") ||
3615 !g_object_class_find_property (payobjclass, "timestamp"))
3619 g_object_get (priv->payloader, "seqnum", seq, NULL);
3622 g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3625 *running_time = GST_CLOCK_TIME_NONE;
3629 g_mutex_unlock (&priv->lock);
3636 GST_WARNING ("Could not get payloader stats");
3637 g_mutex_unlock (&priv->lock);
3643 * gst_rtsp_stream_get_caps:
3644 * @stream: a #GstRTSPStream
3646 * Retrieve the current caps of @stream.
3648 * Returns: (transfer full) (nullable): the #GstCaps of @stream.
3649 * use gst_caps_unref() after usage.
3652 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3654 GstRTSPStreamPrivate *priv;
3657 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3659 priv = stream->priv;
3661 g_mutex_lock (&priv->lock);
3662 if ((result = priv->caps))
3663 gst_caps_ref (result);
3664 g_mutex_unlock (&priv->lock);
3670 * gst_rtsp_stream_recv_rtp:
3671 * @stream: a #GstRTSPStream
3672 * @buffer: (transfer full): a #GstBuffer
3674 * Handle an RTP buffer for the stream. This method is usually called when a
3675 * message has been received from a client using the TCP transport.
3677 * This function takes ownership of @buffer.
3679 * Returns: a GstFlowReturn.
3682 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3684 GstRTSPStreamPrivate *priv;
3686 GstElement *element;
3688 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3689 priv = stream->priv;
3690 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3691 g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3693 g_mutex_lock (&priv->lock);
3694 if (priv->appsrc[0])
3695 element = gst_object_ref (priv->appsrc[0]);
3698 g_mutex_unlock (&priv->lock);
3701 if (priv->appsrc_base_time[0] == -1) {
3702 /* Take current running_time. This timestamp will be put on
3703 * the first buffer of each stream because we are a live source and so we
3704 * timestamp with the running_time. When we are dealing with TCP, we also
3705 * only timestamp the first buffer (using the DISCONT flag) because a server
3706 * typically bursts data, for which we don't want to compensate by speeding
3707 * up the media. The other timestamps will be interpollated from this one
3708 * using the RTP timestamps. */
3709 GST_OBJECT_LOCK (element);
3710 if (GST_ELEMENT_CLOCK (element)) {
3712 GstClockTime base_time;
3714 now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3715 base_time = GST_ELEMENT_CAST (element)->base_time;
3717 priv->appsrc_base_time[0] = now - base_time;
3718 GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3719 GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3720 ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3721 GST_TIME_ARGS (base_time));
3723 GST_OBJECT_UNLOCK (element);
3726 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3727 gst_object_unref (element);
3735 * gst_rtsp_stream_recv_rtcp:
3736 * @stream: a #GstRTSPStream
3737 * @buffer: (transfer full): a #GstBuffer
3739 * Handle an RTCP buffer for the stream. This method is usually called when a
3740 * message has been received from a client using the TCP transport.
3742 * This function takes ownership of @buffer.
3744 * Returns: a GstFlowReturn.
3747 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3749 GstRTSPStreamPrivate *priv;
3751 GstElement *element;
3753 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3754 priv = stream->priv;
3755 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3757 if (priv->joined_bin == NULL) {
3758 gst_buffer_unref (buffer);
3759 return GST_FLOW_NOT_LINKED;
3761 g_mutex_lock (&priv->lock);
3762 if (priv->appsrc[1])
3763 element = gst_object_ref (priv->appsrc[1]);
3766 g_mutex_unlock (&priv->lock);
3769 if (priv->appsrc_base_time[1] == -1) {
3770 /* Take current running_time. This timestamp will be put on
3771 * the first buffer of each stream because we are a live source and so we
3772 * timestamp with the running_time. When we are dealing with TCP, we also
3773 * only timestamp the first buffer (using the DISCONT flag) because a server
3774 * typically bursts data, for which we don't want to compensate by speeding
3775 * up the media. The other timestamps will be interpollated from this one
3776 * using the RTP timestamps. */
3777 GST_OBJECT_LOCK (element);
3778 if (GST_ELEMENT_CLOCK (element)) {
3780 GstClockTime base_time;
3782 now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3783 base_time = GST_ELEMENT_CAST (element)->base_time;
3785 priv->appsrc_base_time[1] = now - base_time;
3786 GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3787 GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3788 ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3789 GST_TIME_ARGS (base_time));
3791 GST_OBJECT_UNLOCK (element);
3794 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3795 gst_object_unref (element);
3798 gst_buffer_unref (buffer);
3803 /* must be called with lock */
3805 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3808 GstRTSPStreamPrivate *priv = stream->priv;
3809 const GstRTSPTransport *tr;
3811 tr = gst_rtsp_stream_transport_get_transport (trans);
3813 switch (tr->lower_transport) {
3814 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3817 if (!check_mcast_part_for_transport (stream, tr))
3819 priv->transports = g_list_prepend (priv->transports, trans);
3822 GST_INFO ("setting ttl-mc %d", tr->ttl);
3823 if (priv->mcast_udpsink[0])
3824 g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "ttl-mc", tr->ttl,
3826 if (priv->mcast_udpsink[1])
3827 g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "ttl-mc", tr->ttl,
3831 priv->transports = g_list_remove (priv->transports, trans);
3835 case GST_RTSP_LOWER_TRANS_UDP:
3840 dest = tr->destination;
3841 if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3844 } else if (priv->client_side) {
3845 /* In client side mode the 'destination' is the RTSP server, so send
3847 min = tr->server_port.min;
3848 max = tr->server_port.max;
3850 min = tr->client_port.min;
3851 max = tr->client_port.max;
3855 GST_INFO ("adding %s:%d-%d", dest, min, max);
3856 if (priv->udpsink[0])
3857 g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3858 g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3859 priv->transports = g_list_prepend (priv->transports, trans);
3861 GST_INFO ("removing %s:%d-%d", dest, min, max);
3862 if (priv->udpsink[0])
3863 g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3864 g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3865 priv->transports = g_list_remove (priv->transports, trans);
3867 priv->transports_cookie++;
3870 case GST_RTSP_LOWER_TRANS_TCP:
3872 GST_INFO ("adding TCP %s", tr->destination);
3873 priv->transports = g_list_prepend (priv->transports, trans);
3875 GST_INFO ("removing TCP %s", tr->destination);
3876 priv->transports = g_list_remove (priv->transports, trans);
3878 priv->transports_cookie++;
3881 goto unknown_transport;
3888 GST_INFO ("Unknown transport %d", tr->lower_transport);
3899 * gst_rtsp_stream_add_transport:
3900 * @stream: a #GstRTSPStream
3901 * @trans: (transfer none): a #GstRTSPStreamTransport
3903 * Add the transport in @trans to @stream. The media of @stream will
3904 * then also be send to the values configured in @trans.
3906 * @stream must be joined to a bin.
3908 * @trans must contain a valid #GstRTSPTransport.
3910 * Returns: %TRUE if @trans was added
3913 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3914 GstRTSPStreamTransport * trans)
3916 GstRTSPStreamPrivate *priv;
3919 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3920 priv = stream->priv;
3921 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3922 g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3924 g_mutex_lock (&priv->lock);
3925 res = update_transport (stream, trans, TRUE);
3926 g_mutex_unlock (&priv->lock);
3932 * gst_rtsp_stream_remove_transport:
3933 * @stream: a #GstRTSPStream
3934 * @trans: (transfer none): a #GstRTSPStreamTransport
3936 * Remove the transport in @trans from @stream. The media of @stream will
3937 * not be sent to the values configured in @trans.
3939 * @stream must be joined to a bin.
3941 * @trans must contain a valid #GstRTSPTransport.
3943 * Returns: %TRUE if @trans was removed
3946 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3947 GstRTSPStreamTransport * trans)
3949 GstRTSPStreamPrivate *priv;
3952 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3953 priv = stream->priv;
3954 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3955 g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3957 g_mutex_lock (&priv->lock);
3958 res = update_transport (stream, trans, FALSE);
3959 g_mutex_unlock (&priv->lock);
3965 * gst_rtsp_stream_update_crypto:
3966 * @stream: a #GstRTSPStream
3968 * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3970 * Update the new crypto information for @ssrc in @stream. If information
3971 * for @ssrc did not exist, it will be added. If information
3972 * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3973 * be removed from @stream.
3975 * Returns: %TRUE if @crypto could be updated
3978 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3979 guint ssrc, GstCaps * crypto)
3981 GstRTSPStreamPrivate *priv;
3983 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3984 g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3986 priv = stream->priv;
3988 GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3990 g_mutex_lock (&priv->lock);
3992 g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3993 gst_caps_ref (crypto));
3995 g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3996 g_mutex_unlock (&priv->lock);
4002 * gst_rtsp_stream_get_rtp_socket:
4003 * @stream: a #GstRTSPStream
4004 * @family: the socket family
4006 * Get the RTP socket from @stream for a @family.
4008 * @stream must be joined to a bin.
4010 * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
4011 * socket could be allocated for @family. Unref after usage
4014 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
4016 GstRTSPStreamPrivate *priv = stream->priv;
4020 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4021 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4022 family == G_SOCKET_FAMILY_IPV6, NULL);
4023 g_return_val_if_fail (priv->udpsink[0], NULL);
4025 if (family == G_SOCKET_FAMILY_IPV6)
4030 g_object_get (priv->udpsink[0], name, &socket, NULL);
4036 * gst_rtsp_stream_get_rtcp_socket:
4037 * @stream: a #GstRTSPStream
4038 * @family: the socket family
4040 * Get the RTCP socket from @stream for a @family.
4042 * @stream must be joined to a bin.
4044 * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
4045 * socket could be allocated for @family. Unref after usage
4048 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
4050 GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
4054 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4055 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4056 family == G_SOCKET_FAMILY_IPV6, NULL);
4057 g_return_val_if_fail (priv->udpsink[1], NULL);
4059 if (family == G_SOCKET_FAMILY_IPV6)
4064 g_object_get (priv->udpsink[1], name, &socket, NULL);
4070 * gst_rtsp_stream_get_rtp_multicast_socket:
4071 * @stream: a #GstRTSPStream
4072 * @family: the socket family
4074 * Get the multicast RTP socket from @stream for a @family.
4076 * Returns: (transfer full) (nullable): the multicast RTP socket or %NULL if no
4077 * socket could be allocated for @family. Unref after usage
4080 gst_rtsp_stream_get_rtp_multicast_socket (GstRTSPStream * stream,
4081 GSocketFamily family)
4083 GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
4087 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4088 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4089 family == G_SOCKET_FAMILY_IPV6, NULL);
4090 g_return_val_if_fail (priv->mcast_udpsink[0], NULL);
4092 if (family == G_SOCKET_FAMILY_IPV6)
4097 g_object_get (priv->mcast_udpsink[0], name, &socket, NULL);
4103 * gst_rtsp_stream_get_rtcp_multicast_socket:
4104 * @stream: a #GstRTSPStream
4105 * @family: the socket family
4107 * Get the multicast RTCP socket from @stream for a @family.
4109 * Returns: (transfer full) (nullable): the multicast RTCP socket or %NULL if no
4110 * socket could be allocated for @family. Unref after usage
4113 gst_rtsp_stream_get_rtcp_multicast_socket (GstRTSPStream * stream,
4114 GSocketFamily family)
4116 GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
4120 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4121 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4122 family == G_SOCKET_FAMILY_IPV6, NULL);
4123 g_return_val_if_fail (priv->mcast_udpsink[1], NULL);
4125 if (family == G_SOCKET_FAMILY_IPV6)
4130 g_object_get (priv->mcast_udpsink[1], name, &socket, NULL);
4136 * gst_rtsp_stream_set_seqnum:
4137 * @stream: a #GstRTSPStream
4138 * @seqnum: a new sequence number
4140 * Configure the sequence number in the payloader of @stream to @seqnum.
4143 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
4145 GstRTSPStreamPrivate *priv;
4147 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
4149 priv = stream->priv;
4151 g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
4155 * gst_rtsp_stream_get_seqnum:
4156 * @stream: a #GstRTSPStream
4158 * Get the configured sequence number in the payloader of @stream.
4160 * Returns: the sequence number of the payloader.
4163 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
4165 GstRTSPStreamPrivate *priv;
4168 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
4170 priv = stream->priv;
4172 g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
4178 * gst_rtsp_stream_transport_filter:
4179 * @stream: a #GstRTSPStream
4180 * @func: (scope call) (allow-none): a callback
4181 * @user_data: (closure): user data passed to @func
4183 * Call @func for each transport managed by @stream. The result value of @func
4184 * determines what happens to the transport. @func will be called with @stream
4185 * locked so no further actions on @stream can be performed from @func.
4187 * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
4190 * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
4192 * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
4193 * will also be added with an additional ref to the result #GList of this
4196 * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
4198 * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
4199 * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
4200 * element in the #GList should be unreffed before the list is freed.
4203 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
4204 GstRTSPStreamTransportFilterFunc func, gpointer user_data)
4206 GstRTSPStreamPrivate *priv;
4207 GList *result, *walk, *next;
4208 GHashTable *visited = NULL;
4211 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4213 priv = stream->priv;
4217 visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
4219 g_mutex_lock (&priv->lock);
4221 cookie = priv->transports_cookie;
4222 for (walk = priv->transports; walk; walk = next) {
4223 GstRTSPStreamTransport *trans = walk->data;
4224 GstRTSPFilterResult res;
4227 next = g_list_next (walk);
4230 /* only visit each transport once */
4231 if (g_hash_table_contains (visited, trans))
4234 g_hash_table_add (visited, g_object_ref (trans));
4235 g_mutex_unlock (&priv->lock);
4237 res = func (stream, trans, user_data);
4239 g_mutex_lock (&priv->lock);
4241 res = GST_RTSP_FILTER_REF;
4243 changed = (cookie != priv->transports_cookie);
4246 case GST_RTSP_FILTER_REMOVE:
4247 update_transport (stream, trans, FALSE);
4249 case GST_RTSP_FILTER_REF:
4250 result = g_list_prepend (result, g_object_ref (trans));
4252 case GST_RTSP_FILTER_KEEP:
4259 g_mutex_unlock (&priv->lock);
4262 g_hash_table_unref (visited);
4267 static GstPadProbeReturn
4268 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4270 GstRTSPStreamPrivate *priv;
4271 GstRTSPStream *stream;
4272 GstBuffer *buffer = NULL;
4275 priv = stream->priv;
4277 GST_DEBUG_OBJECT (pad, "now blocking");
4279 g_mutex_lock (&priv->lock);
4280 priv->blocking = TRUE;
4282 if ((info->type & GST_PAD_PROBE_TYPE_BUFFER)) {
4283 buffer = gst_pad_probe_info_get_buffer (info);
4284 } else if ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)) {
4285 GstBufferList *list = gst_pad_probe_info_get_buffer_list (info);
4286 buffer = gst_buffer_list_get (list, 0);
4288 g_assert_not_reached ();
4292 priv->position = GST_BUFFER_TIMESTAMP (buffer);
4293 GST_DEBUG_OBJECT (stream, "buffer position: %" GST_TIME_FORMAT,
4294 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
4295 g_mutex_unlock (&priv->lock);
4297 gst_element_post_message (priv->payloader,
4298 gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
4299 gst_structure_new_empty ("GstRTSPStreamBlocking")));
4301 return GST_PAD_PROBE_OK;
4305 set_blocked (GstRTSPStream * stream, gboolean blocked)
4307 GstRTSPStreamPrivate *priv;
4310 GST_DEBUG_OBJECT (stream, "blocked: %d", blocked);
4312 priv = stream->priv;
4315 for (i = 0; i < 2; i++) {
4316 if (priv->blocked_id[i] != 0)
4318 if (priv->send_src[i]) {
4319 priv->blocking = FALSE;
4320 priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
4321 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
4322 GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
4323 g_object_ref (stream), g_object_unref);
4327 for (i = 0; i < 2; i++) {
4328 if (priv->blocked_id[i] != 0) {
4329 gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]);
4330 priv->blocked_id[i] = 0;
4333 priv->blocking = FALSE;
4338 * gst_rtsp_stream_set_blocked:
4339 * @stream: a #GstRTSPStream
4340 * @blocked: boolean indicating we should block or unblock
4342 * Blocks or unblocks the dataflow on @stream.
4344 * Returns: %TRUE on success
4347 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
4349 GstRTSPStreamPrivate *priv;
4351 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4353 priv = stream->priv;
4354 g_mutex_lock (&priv->lock);
4355 set_blocked (stream, blocked);
4356 g_mutex_unlock (&priv->lock);
4362 * gst_rtsp_stream_ublock_linked:
4363 * @stream: a #GstRTSPStream
4365 * Unblocks the dataflow on @stream if it is linked.
4367 * Returns: %TRUE on success
4370 gst_rtsp_stream_unblock_linked (GstRTSPStream * stream)
4372 GstRTSPStreamPrivate *priv;
4374 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4376 priv = stream->priv;
4377 g_mutex_lock (&priv->lock);
4378 if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0]))
4379 set_blocked (stream, FALSE);
4380 g_mutex_unlock (&priv->lock);
4386 * gst_rtsp_stream_is_blocking:
4387 * @stream: a #GstRTSPStream
4389 * Check if @stream is blocking on a #GstBuffer.
4391 * Returns: %TRUE if @stream is blocking
4394 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
4396 GstRTSPStreamPrivate *priv;
4399 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4401 priv = stream->priv;
4403 g_mutex_lock (&priv->lock);
4404 result = priv->blocking;
4405 g_mutex_unlock (&priv->lock);
4411 * gst_rtsp_stream_query_position:
4412 * @stream: a #GstRTSPStream
4413 * @position: (out): current position of a #GstRTSPStream
4415 * Query the position of the stream in %GST_FORMAT_TIME. This only considers
4416 * the RTP parts of the pipeline and not the RTCP parts.
4418 * Returns: %TRUE if the position could be queried
4421 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
4423 GstRTSPStreamPrivate *priv;
4427 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4429 /* query position: if no sinks have been added yet,
4430 * we obtain the position from the pad otherwise we query the sinks */
4432 priv = stream->priv;
4434 g_mutex_lock (&priv->lock);
4435 /* depending on the transport type, it should query corresponding sink */
4436 if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP)
4437 sink = priv->udpsink[0];
4438 else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4439 sink = priv->mcast_udpsink[0];
4441 sink = priv->appsink[0];
4444 gst_object_ref (sink);
4445 } else if (priv->send_src[0]) {
4446 pad = gst_object_ref (priv->send_src[0]);
4448 g_mutex_unlock (&priv->lock);
4449 GST_WARNING_OBJECT (stream, "Couldn't obtain postion: erroneous pipeline");
4452 g_mutex_unlock (&priv->lock);
4455 if (!gst_element_query_position (sink, GST_FORMAT_TIME, position)) {
4456 GST_WARNING_OBJECT (stream,
4457 "Couldn't obtain postion: position query failed");
4458 gst_object_unref (sink);
4461 gst_object_unref (sink);
4464 const GstSegment *segment;
4466 event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4468 GST_WARNING_OBJECT (stream, "Couldn't obtain postion: no segment event");
4469 gst_object_unref (pad);
4473 gst_event_parse_segment (event, &segment);
4474 if (segment->format != GST_FORMAT_TIME) {
4477 g_mutex_lock (&priv->lock);
4478 *position = priv->position;
4479 g_mutex_unlock (&priv->lock);
4481 gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *position);
4483 gst_event_unref (event);
4484 gst_object_unref (pad);
4491 * gst_rtsp_stream_query_stop:
4492 * @stream: a #GstRTSPStream
4493 * @stop: (out): current stop of a #GstRTSPStream
4495 * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
4496 * the RTP parts of the pipeline and not the RTCP parts.
4498 * Returns: %TRUE if the stop could be queried
4501 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
4503 GstRTSPStreamPrivate *priv;
4507 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4509 /* query stop position: if no sinks have been added yet,
4510 * we obtain the stop position from the pad otherwise we query the sinks */
4512 priv = stream->priv;
4514 g_mutex_lock (&priv->lock);
4515 /* depending on the transport type, it should query corresponding sink */
4516 if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP)
4517 sink = priv->udpsink[0];
4518 else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4519 sink = priv->mcast_udpsink[0];
4521 sink = priv->appsink[0];
4524 gst_object_ref (sink);
4525 } else if (priv->send_src[0]) {
4526 pad = gst_object_ref (priv->send_src[0]);
4528 g_mutex_unlock (&priv->lock);
4529 GST_WARNING_OBJECT (stream, "Couldn't obtain stop: erroneous pipeline");
4532 g_mutex_unlock (&priv->lock);
4538 query = gst_query_new_segment (GST_FORMAT_TIME);
4539 if (!gst_element_query (sink, query)) {
4540 GST_WARNING_OBJECT (stream, "Couldn't obtain stop: element query failed");
4541 gst_query_unref (query);
4542 gst_object_unref (sink);
4545 gst_query_parse_segment (query, NULL, &format, NULL, stop);
4546 if (format != GST_FORMAT_TIME)
4548 gst_query_unref (query);
4549 gst_object_unref (sink);
4552 const GstSegment *segment;
4554 event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4556 GST_WARNING_OBJECT (stream, "Couldn't obtain stop: no segment event");
4557 gst_object_unref (pad);
4560 gst_event_parse_segment (event, &segment);
4561 if (segment->format != GST_FORMAT_TIME) {
4564 *stop = segment->stop;
4566 *stop = segment->duration;
4568 *stop = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *stop);
4570 gst_event_unref (event);
4571 gst_object_unref (pad);
4578 * gst_rtsp_stream_seekable:
4579 * @stream: a #GstRTSPStream
4581 * Checks whether the individual @stream is seekable.
4583 * Returns: %TRUE if @stream is seekable, else %FALSE.
4586 gst_rtsp_stream_seekable (GstRTSPStream * stream)
4588 GstRTSPStreamPrivate *priv;
4590 GstQuery *query = NULL;
4591 gboolean seekable = FALSE;
4593 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4595 /* query stop position: if no sinks have been added yet,
4596 * we obtain the stop position from the pad otherwise we query the sinks */
4598 priv = stream->priv;
4600 g_mutex_lock (&priv->lock);
4601 /* depending on the transport type, it should query corresponding sink */
4603 pad = gst_object_ref (priv->srcpad);
4605 g_mutex_unlock (&priv->lock);
4606 GST_WARNING_OBJECT (stream, "Pad not available, can't query seekability");
4609 g_mutex_unlock (&priv->lock);
4611 query = gst_query_new_seeking (GST_FORMAT_TIME);
4612 if (!gst_pad_query (pad, query)) {
4613 GST_WARNING_OBJECT (stream, "seeking query failed");
4616 gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
4620 gst_object_unref (pad);
4622 gst_query_unref (query);
4624 GST_DEBUG_OBJECT (stream, "Returning %d", seekable);
4630 * gst_rtsp_stream_complete_stream:
4631 * @stream: a #GstRTSPStream
4632 * @transport: a #GstRTSPTransport
4634 * Add a receiver and sender part to the pipeline based on the transport from
4637 * Returns: %TRUE if the stream has been sucessfully updated.
4640 gst_rtsp_stream_complete_stream (GstRTSPStream * stream,
4641 const GstRTSPTransport * transport)
4643 GstRTSPStreamPrivate *priv;
4645 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4647 priv = stream->priv;
4648 GST_DEBUG_OBJECT (stream, "complete stream");
4650 g_mutex_lock (&priv->lock);
4652 if (!(priv->protocols & transport->lower_transport))
4653 goto unallowed_transport;
4655 if (!create_receiver_part (stream, transport))
4656 goto create_receiver_error;
4658 /* in the RECORD case, we only add RTCP sender part */
4659 if (!create_sender_part (stream, transport))
4660 goto create_sender_error;
4662 priv->is_complete = TRUE;
4663 g_mutex_unlock (&priv->lock);
4665 GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated");
4668 create_receiver_error:
4669 create_sender_error:
4670 unallowed_transport:
4672 g_mutex_unlock (&priv->lock);
4678 * gst_rtsp_stream_is_complete:
4679 * @stream: a #GstRTSPStream
4681 * Checks whether the stream is complete, contains the receiver and the sender
4682 * parts. As the stream contains sink(s) element(s), it's possible to perform
4683 * seek operations on it.
4685 * Returns: %TRUE if the stream contains at least one sink element.
4688 gst_rtsp_stream_is_complete (GstRTSPStream * stream)
4690 GstRTSPStreamPrivate *priv;
4691 gboolean ret = FALSE;
4693 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4695 priv = stream->priv;
4696 g_mutex_lock (&priv->lock);
4697 ret = priv->is_complete;
4698 g_mutex_unlock (&priv->lock);
4704 * gst_rtsp_stream_is_sender:
4705 * @stream: a #GstRTSPStream
4707 * Checks whether the stream is a sender.
4709 * Returns: %TRUE if the stream is a sender and %FALSE otherwise.
4712 gst_rtsp_stream_is_sender (GstRTSPStream * stream)
4714 GstRTSPStreamPrivate *priv;
4715 gboolean ret = FALSE;
4717 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4719 priv = stream->priv;
4720 g_mutex_lock (&priv->lock);
4721 ret = (priv->srcpad != NULL);
4722 g_mutex_unlock (&priv->lock);
4728 * gst_rtsp_stream_is_receiver:
4729 * @stream: a #GstRTSPStream
4731 * Checks whether the stream is a receiver.
4733 * Returns: %TRUE if the stream is a receiver and %FALSE otherwise.
4736 gst_rtsp_stream_is_receiver (GstRTSPStream * stream)
4738 GstRTSPStreamPrivate *priv;
4739 gboolean ret = FALSE;
4741 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4743 priv = stream->priv;
4744 g_mutex_lock (&priv->lock);
4745 ret = (priv->sinkpad != NULL);
4746 g_mutex_unlock (&priv->lock);
4751 #define AES_128_KEY_LEN 16
4752 #define AES_256_KEY_LEN 32
4754 #define HMAC_32_KEY_LEN 4
4755 #define HMAC_80_KEY_LEN 10
4758 mikey_apply_policy (GstCaps * caps, GstMIKEYMessage * msg, guint8 policy)
4760 const gchar *srtp_cipher;
4761 const gchar *srtp_auth;
4762 const GstMIKEYPayload *sp;
4765 /* loop over Security policy until we find one containing policy */
4767 if ((sp = gst_mikey_message_find_payload (msg, GST_MIKEY_PT_SP, i)) == NULL)
4770 if (((GstMIKEYPayloadSP *) sp)->policy == policy)
4774 /* the default ciphers */
4775 srtp_cipher = "aes-128-icm";
4776 srtp_auth = "hmac-sha1-80";
4778 /* now override the defaults with what is in the Security Policy */
4782 /* collect all the params and go over them */
4783 len = gst_mikey_payload_sp_get_n_params (sp);
4784 for (i = 0; i < len; i++) {
4785 const GstMIKEYPayloadSPParam *param =
4786 gst_mikey_payload_sp_get_param (sp, i);
4788 switch (param->type) {
4789 case GST_MIKEY_SP_SRTP_ENC_ALG:
4790 switch (param->val[0]) {
4792 srtp_cipher = "null";
4796 srtp_cipher = "aes-128-icm";
4802 case GST_MIKEY_SP_SRTP_ENC_KEY_LEN:
4803 switch (param->val[0]) {
4804 case AES_128_KEY_LEN:
4805 srtp_cipher = "aes-128-icm";
4807 case AES_256_KEY_LEN:
4808 srtp_cipher = "aes-256-icm";
4814 case GST_MIKEY_SP_SRTP_AUTH_ALG:
4815 switch (param->val[0]) {
4821 srtp_auth = "hmac-sha1-80";
4827 case GST_MIKEY_SP_SRTP_AUTH_KEY_LEN:
4828 switch (param->val[0]) {
4829 case HMAC_32_KEY_LEN:
4830 srtp_auth = "hmac-sha1-32";
4832 case HMAC_80_KEY_LEN:
4833 srtp_auth = "hmac-sha1-80";
4839 case GST_MIKEY_SP_SRTP_SRTP_ENC:
4841 case GST_MIKEY_SP_SRTP_SRTCP_ENC:
4848 /* now configure the SRTP parameters */
4849 gst_caps_set_simple (caps,
4850 "srtp-cipher", G_TYPE_STRING, srtp_cipher,
4851 "srtp-auth", G_TYPE_STRING, srtp_auth,
4852 "srtcp-cipher", G_TYPE_STRING, srtp_cipher,
4853 "srtcp-auth", G_TYPE_STRING, srtp_auth, NULL);
4859 handle_mikey_data (GstRTSPStream * stream, guint8 * data, gsize size)
4861 GstMIKEYMessage *msg;
4863 GstCaps *caps = NULL;
4864 GstMIKEYPayloadKEMAC *kemac;
4865 const GstMIKEYPayloadKeyData *pkd;
4868 /* the MIKEY message contains a CSB or crypto session bundle. It is a
4869 * set of Crypto Sessions protected with the same master key.
4870 * In the context of SRTP, an RTP and its RTCP stream is part of a
4872 if ((msg = gst_mikey_message_new_from_data (data, size, NULL, NULL)) == NULL)
4875 /* we can only handle SRTP crypto sessions for now */
4876 if (msg->map_type != GST_MIKEY_MAP_TYPE_SRTP)
4877 goto invalid_map_type;
4879 /* get the number of crypto sessions. This maps SSRC to its
4880 * security parameters */
4881 n_cs = gst_mikey_message_get_n_cs (msg);
4883 goto no_crypto_sessions;
4885 /* we also need keys */
4886 if (!(kemac = (GstMIKEYPayloadKEMAC *) gst_mikey_message_find_payload
4887 (msg, GST_MIKEY_PT_KEMAC, 0)))
4890 /* we don't support encrypted keys */
4891 if (kemac->enc_alg != GST_MIKEY_ENC_NULL
4892 || kemac->mac_alg != GST_MIKEY_MAC_NULL)
4893 goto unsupported_encryption;
4895 /* get Key data sub-payload */
4896 pkd = (const GstMIKEYPayloadKeyData *)
4897 gst_mikey_payload_kemac_get_sub (&kemac->pt, 0);
4900 gst_buffer_new_wrapped (g_memdup (pkd->key_data, pkd->key_len),
4903 /* go over all crypto sessions and create the security policy for each
4905 for (i = 0; i < n_cs; i++) {
4906 const GstMIKEYMapSRTP *map = gst_mikey_message_get_cs_srtp (msg, i);
4908 caps = gst_caps_new_simple ("application/x-srtp",
4909 "ssrc", G_TYPE_UINT, map->ssrc,
4910 "roc", G_TYPE_UINT, map->roc, "srtp-key", GST_TYPE_BUFFER, key, NULL);
4911 mikey_apply_policy (caps, msg, map->policy);
4913 gst_rtsp_stream_update_crypto (stream, map->ssrc, caps);
4914 gst_caps_unref (caps);
4916 gst_mikey_message_unref (msg);
4917 gst_buffer_unref (key);
4924 GST_DEBUG_OBJECT (stream, "failed to parse MIKEY message");
4929 GST_DEBUG_OBJECT (stream, "invalid map type %d", msg->map_type);
4930 goto cleanup_message;
4934 GST_DEBUG_OBJECT (stream, "no crypto sessions");
4935 goto cleanup_message;
4939 GST_DEBUG_OBJECT (stream, "no keys found");
4940 goto cleanup_message;
4942 unsupported_encryption:
4944 GST_DEBUG_OBJECT (stream, "unsupported key encryption");
4945 goto cleanup_message;
4949 gst_mikey_message_unref (msg);
4954 #define IS_STRIP_CHAR(c) (g_ascii_isspace ((guchar)(c)) || ((c) == '\"'))
4957 strip_chars (gchar * str)
4964 if (!IS_STRIP_CHAR (str[len]))
4968 for (s = str; *s && IS_STRIP_CHAR (*s); s++);
4969 memmove (str, s, len + 1);
4973 * gst_rtsp_stream_handle_keymgmt:
4974 * @stream: a #GstRTSPStream
4975 * @keymgmt: a keymgmt header
4977 * Parse and handle a KeyMgmt header.
4981 /* KeyMgmt = "KeyMgmt" ":" key-mgmt-spec 0*("," key-mgmt-spec)
4982 * key-mgmt-spec = "prot" "=" KMPID ";" ["uri" "=" %x22 URI %x22 ";"]
4985 gst_rtsp_stream_handle_keymgmt (GstRTSPStream * stream, const gchar * keymgmt)
4990 specs = g_strsplit (keymgmt, ",", 0);
4991 for (i = 0; specs[i]; i++) {
4994 split = g_strsplit (specs[i], ";", 0);
4995 for (j = 0; split[j]; j++) {
4996 g_strstrip (split[j]);
4997 if (g_str_has_prefix (split[j], "prot=")) {
4998 g_strstrip (split[j] + 5);
4999 if (!g_str_equal (split[j] + 5, "mikey"))
5001 GST_DEBUG ("found mikey");
5002 } else if (g_str_has_prefix (split[j], "uri=")) {
5003 strip_chars (split[j] + 4);
5004 GST_DEBUG ("found uri '%s'", split[j] + 4);
5005 } else if (g_str_has_prefix (split[j], "data=")) {
5008 strip_chars (split[j] + 5);
5009 GST_DEBUG ("found data '%s'", split[j] + 5);
5010 data = g_base64_decode_inplace (split[j] + 5, &size);
5011 handle_mikey_data (stream, data, size);
5022 * gst_rtsp_stream_get_ulpfec_pt:
5024 * Returns: the payload type used for ULPFEC protection packets
5029 gst_rtsp_stream_get_ulpfec_pt (GstRTSPStream * stream)
5033 g_mutex_lock (&stream->priv->lock);
5034 res = stream->priv->ulpfec_pt;
5035 g_mutex_unlock (&stream->priv->lock);
5041 * gst_rtsp_stream_set_ulpfec_pt:
5043 * Set the payload type to be used for ULPFEC protection packets
5048 gst_rtsp_stream_set_ulpfec_pt (GstRTSPStream * stream, guint pt)
5050 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5052 g_mutex_lock (&stream->priv->lock);
5053 stream->priv->ulpfec_pt = pt;
5054 if (stream->priv->ulpfec_encoder) {
5055 g_object_set (stream->priv->ulpfec_encoder, "pt", pt, NULL);
5057 g_mutex_unlock (&stream->priv->lock);
5061 * gst_rtsp_stream_request_ulpfec_decoder:
5063 * Creating a rtpulpfecdec element
5065 * Returns: (transfer full) (nullable): a #GstElement.
5070 gst_rtsp_stream_request_ulpfec_decoder (GstRTSPStream * stream,
5071 GstElement * rtpbin, guint sessid)
5073 GObject *internal_storage = NULL;
5075 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5076 stream->priv->ulpfec_decoder =
5077 gst_object_ref (gst_element_factory_make ("rtpulpfecdec", NULL));
5079 g_signal_emit_by_name (G_OBJECT (rtpbin), "get-internal-storage", sessid,
5081 g_object_set (stream->priv->ulpfec_decoder, "storage", internal_storage,
5083 g_object_unref (internal_storage);
5084 update_ulpfec_decoder_pt (stream);
5086 return stream->priv->ulpfec_decoder;
5090 * gst_rtsp_stream_request_ulpfec_encoder:
5092 * Creating a rtpulpfecenc element
5094 * Returns: (transfer full) (nullable): a #GstElement.
5099 gst_rtsp_stream_request_ulpfec_encoder (GstRTSPStream * stream, guint sessid)
5101 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5103 if (!stream->priv->ulpfec_percentage)
5106 stream->priv->ulpfec_encoder =
5107 gst_object_ref (gst_element_factory_make ("rtpulpfecenc", NULL));
5109 g_object_set (stream->priv->ulpfec_encoder, "pt", stream->priv->ulpfec_pt,
5110 "percentage", stream->priv->ulpfec_percentage, NULL);
5112 return stream->priv->ulpfec_encoder;
5116 * gst_rtsp_stream_set_ulpfec_percentage:
5118 * Sets the amount of redundancy to apply when creating ULPFEC
5119 * protection packets.
5124 gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream * stream, guint percentage)
5126 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5128 g_mutex_lock (&stream->priv->lock);
5129 stream->priv->ulpfec_percentage = percentage;
5130 if (stream->priv->ulpfec_encoder) {
5131 g_object_set (stream->priv->ulpfec_encoder, "percentage", percentage, NULL);
5133 g_mutex_unlock (&stream->priv->lock);
5137 * gst_rtsp_stream_get_ulpfec_percentage:
5139 * Returns: the amount of redundancy applied when creating ULPFEC
5140 * protection packets.
5145 gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
5149 g_mutex_lock (&stream->priv->lock);
5150 res = stream->priv->ulpfec_percentage;
5151 g_mutex_unlock (&stream->priv->lock);