2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3 * Copyright (C) 2015 Centricular Ltd
4 * Author: Sebastian Dröge <sebastian@centricular.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
23 * @short_description: A media stream
24 * @see_also: #GstRTSPMedia
26 * The #GstRTSPStream object manages the data transport for one stream. It
27 * is created from a payloader element and a source pad that produce the RTP
28 * packets for the stream.
30 * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31 * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
33 * The #GstRTSPStream will use the configured addresspool, as set with
34 * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35 * stream. With gst_rtsp_stream_get_multicast_address() you can get the
38 * With gst_rtsp_stream_get_server_port () you can get the port that the server
39 * will use to receive RTCP. This is the part that the clients will use to send
42 * With gst_rtsp_stream_add_transport() destinations can be added where the
43 * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44 * the destination again.
46 * Last reviewed on 2013-07-16 (1.0.0)
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
58 #include <gst/rtp/gstrtpbuffer.h>
60 #include "rtsp-stream.h"
62 struct _GstRTSPStreamPrivate
66 /* Only one pad is ever set */
67 GstPad *srcpad, *sinkpad;
68 GstElement *payloader;
72 /* TRUE if this stream is running on
73 * the client side of an RTSP link (for RECORD) */
77 /* TRUE if stream is complete. This means that the receiver and the sender
78 * parts are present in the stream. */
80 GstRTSPProfile profiles;
81 GstRTSPLowerTrans allowed_protocols;
82 GstRTSPLowerTrans configured_protocols;
84 /* pads on the rtpbin */
85 GstPad *send_rtp_sink;
90 /* the RTPSession object */
93 /* SRTP encoder/decoder */
99 GstElement *udpsrc_v4[2];
100 GstElement *udpsrc_v6[2];
101 GstElement *udpqueue[2];
102 GstElement *udpsink[2];
103 GSocket *socket_v4[2];
104 GSocket *socket_v6[2];
106 /* for UDP multicast */
107 GstElement *mcast_udpsrc_v4[2];
108 GstElement *mcast_udpsrc_v6[2];
109 GstElement *mcast_udpqueue[2];
110 GstElement *mcast_udpsink[2];
111 GSocket *mcast_socket_v4[2];
112 GSocket *mcast_socket_v6[2];
113 GList *mcast_clients;
115 /* for TCP transport */
116 GstElement *appsrc[2];
117 GstClockTime appsrc_base_time[2];
118 GstElement *appqueue[2];
119 GstElement *appsink[2];
122 GstElement *funnel[2];
126 GstElement *rtxreceive;
128 GstClockTime rtx_time;
130 /* Forward Error Correction with RFC 5109 */
131 GstElement *ulpfec_decoder;
132 GstElement *ulpfec_encoder;
134 gboolean ulpfec_enabled;
135 guint ulpfec_percentage;
137 /* pool used to manage unicast and multicast addresses */
138 GstRTSPAddressPool *pool;
140 /* unicast server addr/port */
141 GstRTSPAddress *server_addr_v4;
142 GstRTSPAddress *server_addr_v6;
144 /* multicast addresses */
145 GstRTSPAddress *mcast_addr_v4;
146 GstRTSPAddress *mcast_addr_v6;
148 gchar *multicast_iface;
151 /* the caps of the stream */
155 /* transports we stream to */
158 guint transports_cookie;
160 GList *tr_cache_rtcp;
161 guint tr_cache_cookie_rtp;
162 guint tr_cache_cookie_rtcp;
163 guint n_tcp_transports;
164 gboolean have_buffer[2];
169 /* stream blocking */
170 gulong blocked_id[2];
173 /* current stream postion */
174 GstClockTime position;
176 /* pt->caps map for RECORD streams */
179 GstRTSPPublishClockMode publish_clock_mode;
182 #define DEFAULT_CONTROL NULL
183 #define DEFAULT_PROFILES GST_RTSP_PROFILE_AVP
184 #define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
185 GST_RTSP_LOWER_TRANS_TCP
186 #define DEFAULT_MAX_MCAST_TTL 255
199 SIGNAL_NEW_RTP_ENCODER,
200 SIGNAL_NEW_RTCP_ENCODER,
201 SIGNAL_NEW_RTP_RTCP_DECODER,
205 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
206 #define GST_CAT_DEFAULT rtsp_stream_debug
208 static GQuark ssrc_stream_map_key;
210 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
211 GValue * value, GParamSpec * pspec);
212 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
213 const GValue * value, GParamSpec * pspec);
215 static void gst_rtsp_stream_finalize (GObject * obj);
218 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
221 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
223 G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
226 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
228 GObjectClass *gobject_class;
230 gobject_class = G_OBJECT_CLASS (klass);
232 gobject_class->get_property = gst_rtsp_stream_get_property;
233 gobject_class->set_property = gst_rtsp_stream_set_property;
234 gobject_class->finalize = gst_rtsp_stream_finalize;
236 g_object_class_install_property (gobject_class, PROP_CONTROL,
237 g_param_spec_string ("control", "Control",
238 "The control string for this stream", DEFAULT_CONTROL,
239 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
241 g_object_class_install_property (gobject_class, PROP_PROFILES,
242 g_param_spec_flags ("profiles", "Profiles",
243 "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
244 DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
246 g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
247 g_param_spec_flags ("protocols", "Protocols",
248 "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
249 DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
251 gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
252 g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
253 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
254 G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
256 gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
257 g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
258 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
259 G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
261 gst_rtsp_stream_signals[SIGNAL_NEW_RTP_RTCP_DECODER] =
262 g_signal_new ("new-rtp-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
263 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
264 G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
266 GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
268 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
272 gst_rtsp_stream_init (GstRTSPStream * stream)
274 GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
276 GST_DEBUG ("new stream %p", stream);
281 priv->control = g_strdup (DEFAULT_CONTROL);
282 priv->profiles = DEFAULT_PROFILES;
283 priv->allowed_protocols = DEFAULT_PROTOCOLS;
284 priv->configured_protocols = 0;
285 priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
286 priv->max_mcast_ttl = DEFAULT_MAX_MCAST_TTL;
288 g_mutex_init (&priv->lock);
290 priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
291 NULL, (GDestroyNotify) gst_caps_unref);
292 priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
293 (GDestroyNotify) gst_caps_unref);
296 typedef struct _UdpClientAddrInfo UdpClientAddrInfo;
298 struct _UdpClientAddrInfo
302 guint add_count; /* how often this address has been added */
306 free_mcast_client (gpointer data)
308 UdpClientAddrInfo *client = data;
310 g_free (client->address);
315 gst_rtsp_stream_finalize (GObject * obj)
317 GstRTSPStream *stream;
318 GstRTSPStreamPrivate *priv;
321 stream = GST_RTSP_STREAM (obj);
324 GST_DEBUG ("finalize stream %p", stream);
326 /* we really need to be unjoined now */
327 g_return_if_fail (priv->joined_bin == NULL);
329 if (priv->mcast_addr_v4)
330 gst_rtsp_address_free (priv->mcast_addr_v4);
331 if (priv->mcast_addr_v6)
332 gst_rtsp_address_free (priv->mcast_addr_v6);
333 if (priv->server_addr_v4)
334 gst_rtsp_address_free (priv->server_addr_v4);
335 if (priv->server_addr_v6)
336 gst_rtsp_address_free (priv->server_addr_v6);
338 g_object_unref (priv->pool);
340 g_object_unref (priv->rtxsend);
341 if (priv->rtxreceive)
342 g_object_unref (priv->rtxreceive);
343 if (priv->ulpfec_encoder)
344 gst_object_unref (priv->ulpfec_encoder);
345 if (priv->ulpfec_decoder)
346 gst_object_unref (priv->ulpfec_decoder);
348 for (i = 0; i < 2; i++) {
349 if (priv->socket_v4[i])
350 g_object_unref (priv->socket_v4[i]);
351 if (priv->socket_v6[i])
352 g_object_unref (priv->socket_v6[i]);
353 if (priv->mcast_socket_v4[i])
354 g_object_unref (priv->mcast_socket_v4[i]);
355 if (priv->mcast_socket_v6[i])
356 g_object_unref (priv->mcast_socket_v6[i]);
359 g_free (priv->multicast_iface);
360 g_list_free_full (priv->mcast_clients, (GDestroyNotify) free_mcast_client);
362 gst_object_unref (priv->payloader);
364 gst_object_unref (priv->srcpad);
366 gst_object_unref (priv->sinkpad);
367 g_free (priv->control);
368 g_mutex_clear (&priv->lock);
370 g_hash_table_unref (priv->keys);
371 g_hash_table_destroy (priv->ptmap);
373 G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
377 gst_rtsp_stream_get_property (GObject * object, guint propid,
378 GValue * value, GParamSpec * pspec)
380 GstRTSPStream *stream = GST_RTSP_STREAM (object);
384 g_value_take_string (value, gst_rtsp_stream_get_control (stream));
387 g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
390 g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
393 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
398 gst_rtsp_stream_set_property (GObject * object, guint propid,
399 const GValue * value, GParamSpec * pspec)
401 GstRTSPStream *stream = GST_RTSP_STREAM (object);
405 gst_rtsp_stream_set_control (stream, g_value_get_string (value));
408 gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
411 gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
414 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
419 * gst_rtsp_stream_new:
422 * @payloader: a #GstElement
424 * Create a new media stream with index @idx that handles RTP data on
425 * @pad and has a payloader element @payloader if @pad is a source pad
426 * or a depayloader element @payloader if @pad is a sink pad.
428 * Returns: (transfer full): a new #GstRTSPStream
431 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
433 GstRTSPStreamPrivate *priv;
434 GstRTSPStream *stream;
436 g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
437 g_return_val_if_fail (GST_IS_PAD (pad), NULL);
439 stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
442 priv->payloader = gst_object_ref (payloader);
443 if (GST_PAD_IS_SRC (pad))
444 priv->srcpad = gst_object_ref (pad);
446 priv->sinkpad = gst_object_ref (pad);
452 * gst_rtsp_stream_get_index:
453 * @stream: a #GstRTSPStream
455 * Get the stream index.
457 * Return: the stream index.
460 gst_rtsp_stream_get_index (GstRTSPStream * stream)
462 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
464 return stream->priv->idx;
468 * gst_rtsp_stream_get_pt:
469 * @stream: a #GstRTSPStream
471 * Get the stream payload type.
473 * Return: the stream payload type.
476 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
478 GstRTSPStreamPrivate *priv;
481 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
485 g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
491 * gst_rtsp_stream_get_srcpad:
492 * @stream: a #GstRTSPStream
494 * Get the srcpad associated with @stream.
496 * Returns: (transfer full) (nullable): the srcpad. Unref after usage.
499 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
501 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
503 if (!stream->priv->srcpad)
506 return gst_object_ref (stream->priv->srcpad);
510 * gst_rtsp_stream_get_sinkpad:
511 * @stream: a #GstRTSPStream
513 * Get the sinkpad associated with @stream.
515 * Returns: (transfer full) (nullable): the sinkpad. Unref after usage.
518 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
520 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
522 if (!stream->priv->sinkpad)
525 return gst_object_ref (stream->priv->sinkpad);
529 * gst_rtsp_stream_get_control:
530 * @stream: a #GstRTSPStream
532 * Get the control string to identify this stream.
534 * Returns: (transfer full) (nullable): the control string. g_free() after usage.
537 gst_rtsp_stream_get_control (GstRTSPStream * stream)
539 GstRTSPStreamPrivate *priv;
542 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
546 g_mutex_lock (&priv->lock);
547 if ((result = g_strdup (priv->control)) == NULL)
548 result = g_strdup_printf ("stream=%u", priv->idx);
549 g_mutex_unlock (&priv->lock);
555 * gst_rtsp_stream_set_control:
556 * @stream: a #GstRTSPStream
557 * @control: (nullable): a control string
559 * Set the control string in @stream.
562 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
564 GstRTSPStreamPrivate *priv;
566 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
570 g_mutex_lock (&priv->lock);
571 g_free (priv->control);
572 priv->control = g_strdup (control);
573 g_mutex_unlock (&priv->lock);
577 * gst_rtsp_stream_has_control:
578 * @stream: a #GstRTSPStream
579 * @control: (nullable): a control string
581 * Check if @stream has the control string @control.
583 * Returns: %TRUE is @stream has @control as the control string
586 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
588 GstRTSPStreamPrivate *priv;
591 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
595 g_mutex_lock (&priv->lock);
597 res = (g_strcmp0 (priv->control, control) == 0);
601 if (sscanf (control, "stream=%u", &streamid) > 0)
602 res = (streamid == priv->idx);
606 g_mutex_unlock (&priv->lock);
612 * gst_rtsp_stream_set_mtu:
613 * @stream: a #GstRTSPStream
616 * Configure the mtu in the payloader of @stream to @mtu.
619 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
621 GstRTSPStreamPrivate *priv;
623 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
627 GST_LOG_OBJECT (stream, "set MTU %u", mtu);
629 g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
633 * gst_rtsp_stream_get_mtu:
634 * @stream: a #GstRTSPStream
636 * Get the configured MTU in the payloader of @stream.
638 * Returns: the MTU of the payloader.
641 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
643 GstRTSPStreamPrivate *priv;
646 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
650 g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
655 /* Update the dscp qos property on the udp sinks */
657 update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink)
659 GstRTSPStreamPrivate *priv;
664 g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL);
669 * gst_rtsp_stream_set_dscp_qos:
670 * @stream: a #GstRTSPStream
671 * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
673 * Configure the dscp qos of the outgoing sockets to @dscp_qos.
676 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
678 GstRTSPStreamPrivate *priv;
680 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
684 GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
686 if (dscp_qos < -1 || dscp_qos > 63) {
687 GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
691 priv->dscp_qos = dscp_qos;
693 update_dscp_qos (stream, priv->udpsink);
697 * gst_rtsp_stream_get_dscp_qos:
698 * @stream: a #GstRTSPStream
700 * Get the configured DSCP QoS in of the outgoing sockets.
702 * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
705 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
707 GstRTSPStreamPrivate *priv;
709 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
713 return priv->dscp_qos;
717 * gst_rtsp_stream_is_transport_supported:
718 * @stream: a #GstRTSPStream
719 * @transport: (transfer none): a #GstRTSPTransport
721 * Check if @transport can be handled by stream
723 * Returns: %TRUE if @transport can be handled by @stream.
726 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
727 GstRTSPTransport * transport)
729 GstRTSPStreamPrivate *priv;
731 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
732 g_return_val_if_fail (transport != NULL, FALSE);
736 g_mutex_lock (&priv->lock);
737 if (transport->trans != GST_RTSP_TRANS_RTP)
738 goto unsupported_transmode;
740 if (!(transport->profile & priv->profiles))
741 goto unsupported_profile;
743 if (!(transport->lower_transport & priv->allowed_protocols))
744 goto unsupported_ltrans;
746 g_mutex_unlock (&priv->lock);
751 unsupported_transmode:
753 GST_DEBUG ("unsupported transport mode %d", transport->trans);
754 g_mutex_unlock (&priv->lock);
759 GST_DEBUG ("unsupported profile %d", transport->profile);
760 g_mutex_unlock (&priv->lock);
765 GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
766 g_mutex_unlock (&priv->lock);
772 * gst_rtsp_stream_set_profiles:
773 * @stream: a #GstRTSPStream
774 * @profiles: the new profiles
776 * Configure the allowed profiles for @stream.
779 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
781 GstRTSPStreamPrivate *priv;
783 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
787 g_mutex_lock (&priv->lock);
788 priv->profiles = profiles;
789 g_mutex_unlock (&priv->lock);
793 * gst_rtsp_stream_get_profiles:
794 * @stream: a #GstRTSPStream
796 * Get the allowed profiles of @stream.
798 * Returns: a #GstRTSPProfile
801 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
803 GstRTSPStreamPrivate *priv;
806 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
810 g_mutex_lock (&priv->lock);
811 res = priv->profiles;
812 g_mutex_unlock (&priv->lock);
818 * gst_rtsp_stream_set_protocols:
819 * @stream: a #GstRTSPStream
820 * @protocols: the new flags
822 * Configure the allowed lower transport for @stream.
825 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
826 GstRTSPLowerTrans protocols)
828 GstRTSPStreamPrivate *priv;
830 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
834 g_mutex_lock (&priv->lock);
835 priv->allowed_protocols = protocols;
836 g_mutex_unlock (&priv->lock);
840 * gst_rtsp_stream_get_protocols:
841 * @stream: a #GstRTSPStream
843 * Get the allowed protocols of @stream.
845 * Returns: a #GstRTSPLowerTrans
848 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
850 GstRTSPStreamPrivate *priv;
851 GstRTSPLowerTrans res;
853 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
854 GST_RTSP_LOWER_TRANS_UNKNOWN);
858 g_mutex_lock (&priv->lock);
859 res = priv->allowed_protocols;
860 g_mutex_unlock (&priv->lock);
866 * gst_rtsp_stream_set_address_pool:
867 * @stream: a #GstRTSPStream
868 * @pool: (transfer none) (nullable): a #GstRTSPAddressPool
870 * configure @pool to be used as the address pool of @stream.
873 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
874 GstRTSPAddressPool * pool)
876 GstRTSPStreamPrivate *priv;
877 GstRTSPAddressPool *old;
879 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
883 GST_LOG_OBJECT (stream, "set address pool %p", pool);
885 g_mutex_lock (&priv->lock);
886 if ((old = priv->pool) != pool)
887 priv->pool = pool ? g_object_ref (pool) : NULL;
890 g_mutex_unlock (&priv->lock);
893 g_object_unref (old);
897 * gst_rtsp_stream_get_address_pool:
898 * @stream: a #GstRTSPStream
900 * Get the #GstRTSPAddressPool used as the address pool of @stream.
902 * Returns: (transfer full) (nullable): the #GstRTSPAddressPool of @stream.
903 * g_object_unref() after usage.
906 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
908 GstRTSPStreamPrivate *priv;
909 GstRTSPAddressPool *result;
911 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
915 g_mutex_lock (&priv->lock);
916 if ((result = priv->pool))
917 g_object_ref (result);
918 g_mutex_unlock (&priv->lock);
924 * gst_rtsp_stream_set_multicast_iface:
925 * @stream: a #GstRTSPStream
926 * @multicast_iface: (transfer none) (nullable): a multicast interface name
928 * configure @multicast_iface to be used for @stream.
931 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
932 const gchar * multicast_iface)
934 GstRTSPStreamPrivate *priv;
937 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
941 GST_LOG_OBJECT (stream, "set multicast iface %s",
942 GST_STR_NULL (multicast_iface));
944 g_mutex_lock (&priv->lock);
945 if ((old = priv->multicast_iface) != multicast_iface)
946 priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
949 g_mutex_unlock (&priv->lock);
956 * gst_rtsp_stream_get_multicast_iface:
957 * @stream: a #GstRTSPStream
959 * Get the multicast interface used for @stream.
961 * Returns: (transfer full) (nullable): the multicast interface for @stream.
962 * g_free() after usage.
965 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
967 GstRTSPStreamPrivate *priv;
970 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
974 g_mutex_lock (&priv->lock);
975 if ((result = priv->multicast_iface))
976 result = g_strdup (result);
977 g_mutex_unlock (&priv->lock);
983 * gst_rtsp_stream_get_multicast_address:
984 * @stream: a #GstRTSPStream
985 * @family: the #GSocketFamily
987 * Get the multicast address of @stream for @family. The original
988 * #GstRTSPAddress is cached and copy is returned, so freeing the return value
989 * won't release the address from the pool.
991 * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
992 * or %NULL when no address could be allocated. gst_rtsp_address_free()
996 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
997 GSocketFamily family)
999 GstRTSPStreamPrivate *priv;
1000 GstRTSPAddress *result;
1001 GstRTSPAddress **addrp;
1002 GstRTSPAddressFlags flags;
1004 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1006 priv = stream->priv;
1008 g_mutex_lock (&stream->priv->lock);
1010 if (family == G_SOCKET_FAMILY_IPV6) {
1011 flags = GST_RTSP_ADDRESS_FLAG_IPV6;
1012 addrp = &priv->mcast_addr_v6;
1014 flags = GST_RTSP_ADDRESS_FLAG_IPV4;
1015 addrp = &priv->mcast_addr_v4;
1018 if (*addrp == NULL) {
1019 if (priv->pool == NULL)
1022 flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
1024 *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
1028 /* FIXME: Also reserve the same port with unicast ANY address, since that's
1029 * where we are going to bind our socket. Probably loop until we find a port
1030 * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
1031 * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
1032 * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
1034 result = gst_rtsp_address_copy (*addrp);
1036 g_mutex_unlock (&stream->priv->lock);
1043 GST_ERROR_OBJECT (stream, "no address pool specified");
1044 g_mutex_unlock (&stream->priv->lock);
1049 GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1050 g_mutex_unlock (&stream->priv->lock);
1056 * gst_rtsp_stream_reserve_address:
1057 * @stream: a #GstRTSPStream
1058 * @address: an address
1063 * Reserve @address and @port as the address and port of @stream. The original
1064 * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1065 * won't release the address from the pool.
1067 * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1068 * the address could be reserved. gst_rtsp_address_free() after usage.
1071 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1072 const gchar * address, guint port, guint n_ports, guint ttl)
1074 GstRTSPStreamPrivate *priv;
1075 GstRTSPAddress *result;
1077 GSocketFamily family;
1078 GstRTSPAddress **addrp;
1080 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1081 g_return_val_if_fail (address != NULL, NULL);
1082 g_return_val_if_fail (port > 0, NULL);
1083 g_return_val_if_fail (n_ports > 0, NULL);
1084 g_return_val_if_fail (ttl > 0, NULL);
1086 priv = stream->priv;
1088 addr = g_inet_address_new_from_string (address);
1090 GST_ERROR ("failed to get inet addr from %s", address);
1091 family = G_SOCKET_FAMILY_IPV4;
1093 family = g_inet_address_get_family (addr);
1094 g_object_unref (addr);
1097 if (family == G_SOCKET_FAMILY_IPV6)
1098 addrp = &priv->mcast_addr_v6;
1100 addrp = &priv->mcast_addr_v4;
1102 g_mutex_lock (&priv->lock);
1103 if (*addrp == NULL) {
1104 GstRTSPAddressPoolResult res;
1106 if (priv->pool == NULL)
1109 res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1110 port, n_ports, ttl, addrp);
1111 if (res != GST_RTSP_ADDRESS_POOL_OK)
1114 /* FIXME: Also reserve the same port with unicast ANY address, since that's
1115 * where we are going to bind our socket. */
1117 if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1118 (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1119 (*addrp)->ttl != ttl)
1120 goto different_address;
1122 result = gst_rtsp_address_copy (*addrp);
1123 g_mutex_unlock (&priv->lock);
1130 GST_ERROR_OBJECT (stream, "no address pool specified");
1131 g_mutex_unlock (&priv->lock);
1136 GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1138 g_mutex_unlock (&priv->lock);
1143 GST_ERROR_OBJECT (stream,
1144 "address %s is not the same as %s that was already reserved",
1145 address, (*addrp)->address);
1146 g_mutex_unlock (&priv->lock);
1151 /* must be called with lock */
1153 set_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1154 GSocketFamily family)
1156 const gchar *multisink_socket;
1158 if (family == G_SOCKET_FAMILY_IPV6)
1159 multisink_socket = "socket-v6";
1161 multisink_socket = "socket";
1163 g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL);
1166 /* must be called with lock */
1168 set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1169 GSocketFamily family, const gchar * multicast_iface,
1170 const gchar * addr_str, gint port, gint mcast_ttl)
1172 set_socket_for_udpsink (udpsink, socket, family);
1174 if (multicast_iface) {
1175 GST_INFO ("setting multicast-iface %s", multicast_iface);
1176 g_object_set (G_OBJECT (udpsink), "multicast-iface", multicast_iface, NULL);
1179 if (mcast_ttl > 0) {
1180 GST_INFO ("setting ttl-mc %d", mcast_ttl);
1181 g_object_set (G_OBJECT (udpsink), "ttl-mc", mcast_ttl, NULL);
1186 /* must be called with lock */
1188 set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1189 GSocketFamily family)
1191 set_socket_for_udpsink (udpsink, socket, family);
1195 get_port_from_socket (GSocket * socket)
1198 GSocketAddress *sockaddr;
1201 GST_DEBUG ("socket: %p", socket);
1202 sockaddr = g_socket_get_local_address (socket, &err);
1203 if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) {
1204 g_clear_object (&sockaddr);
1205 GST_ERROR ("failed to get sockaddr: %s", err->message);
1210 port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr));
1211 g_object_unref (sockaddr);
1218 create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink,
1219 GSocket * socket_v4, GSocket * socket_v6, gboolean multicast,
1220 gboolean is_rtp, gint mcast_ttl)
1222 GstRTSPStreamPrivate *priv = stream->priv;
1224 *udpsink = gst_element_factory_make ("multiudpsink", NULL);
1227 goto no_udp_protocol;
1229 /* configure sinks */
1231 g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL);
1233 g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL);
1236 g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL);
1238 g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL);
1240 /* Needs to be async for RECORD streams, otherwise we will never go to
1241 * PLAYING because the sinks will wait for data while the udpsrc can't
1242 * provide data with timestamps in PAUSED. */
1243 if (!is_rtp || priv->sinkpad)
1244 g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL);
1247 /* join multicast group when adding clients, so we'll start receiving from it.
1248 * We cannot rely on the udpsrc to join the group since its socket is always a
1249 * local unicast one. */
1250 g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL);
1252 g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL);
1255 /* update the dscp qos field in the sinks */
1256 update_dscp_qos (stream, udpsink);
1258 if (priv->server_addr_v4) {
1259 GST_DEBUG_OBJECT (stream, "udp IPv4, configure udpsinks");
1260 set_unicast_socket_for_udpsink (*udpsink, socket_v4, G_SOCKET_FAMILY_IPV4);
1263 if (priv->server_addr_v6) {
1264 GST_DEBUG_OBJECT (stream, "udp IPv6, configure udpsinks");
1265 set_unicast_socket_for_udpsink (*udpsink, socket_v6, G_SOCKET_FAMILY_IPV6);
1270 if (priv->mcast_addr_v4) {
1271 GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks");
1272 port = get_port_from_socket (socket_v4);
1274 goto get_port_failed;
1275 set_multicast_socket_for_udpsink (*udpsink, socket_v4,
1276 G_SOCKET_FAMILY_IPV4, priv->multicast_iface,
1277 priv->mcast_addr_v4->address, port, mcast_ttl);
1280 if (priv->mcast_addr_v6) {
1281 GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks");
1282 port = get_port_from_socket (socket_v6);
1284 goto get_port_failed;
1285 set_multicast_socket_for_udpsink (*udpsink, socket_v6,
1286 G_SOCKET_FAMILY_IPV6, priv->multicast_iface,
1287 priv->mcast_addr_v6->address, port, mcast_ttl);
1297 GST_ERROR_OBJECT (stream, "failed to create udpsink element");
1302 GST_ERROR_OBJECT (stream, "failed to get udp port");
1307 /* must be called with lock */
1309 create_and_configure_udpsource (GstElement ** udpsrc, GSocket * socket)
1311 GstStateChangeReturn ret;
1313 g_assert (socket != NULL);
1315 *udpsrc = gst_element_factory_make ("udpsrc", NULL);
1316 if (*udpsrc == NULL)
1319 g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL);
1321 /* The udpsrc cannot do the join because its socket is always a local unicast
1322 * one. The udpsink sharing the same socket will do it for us. */
1323 g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL);
1325 g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL);
1327 g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL);
1329 ret = gst_element_set_state (*udpsrc, GST_STATE_READY);
1330 if (ret == GST_STATE_CHANGE_FAILURE)
1339 gst_element_set_state (*udpsrc, GST_STATE_NULL);
1340 g_clear_object (udpsrc);
1347 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1348 GSocket * socket_out[2], GstRTSPAddress ** server_addr_out,
1349 gboolean multicast, GstRTSPTransport * ct, gboolean use_transport_settings)
1351 GstRTSPStreamPrivate *priv = stream->priv;
1352 GSocket *rtp_socket = NULL;
1353 GSocket *rtcp_socket = NULL;
1354 gint tmp_rtp, tmp_rtcp;
1356 GList *rejected_addresses = NULL;
1357 GstRTSPAddress *addr = NULL;
1358 GInetAddress *inetaddr = NULL;
1359 GSocketAddress *rtp_sockaddr = NULL;
1360 GSocketAddress *rtcp_sockaddr = NULL;
1361 GstRTSPAddressPool *pool;
1362 gboolean transport_settings_defined = FALSE;
1367 /* Start with random port */
1370 if (use_transport_settings) {
1377 /* multicast and transport specific case */
1378 if (ct->destination != NULL) {
1379 tmp_rtp = ct->port.min;
1380 tmp_rtcp = ct->port.max;
1381 inetaddr = g_inet_address_new_from_string (ct->destination);
1382 if (inetaddr == NULL)
1383 goto destination_error;
1384 if (!g_inet_address_get_is_multicast (inetaddr))
1385 goto destination_no_mcast;
1386 g_object_unref (inetaddr);
1387 inetaddr = g_inet_address_new_any (family);
1389 GST_DEBUG_OBJECT (stream, "use transport settings");
1390 transport_settings_defined = TRUE;
1394 rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1395 G_SOCKET_PROTOCOL_UDP, NULL);
1397 goto no_udp_protocol;
1398 g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1400 /* try to allocate 2 UDP ports, the RTP port should be an even
1401 * number and the RTCP port should be the next (uneven) port */
1404 if (rtp_socket == NULL) {
1405 rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1406 G_SOCKET_PROTOCOL_UDP, NULL);
1408 goto no_udp_protocol;
1409 g_socket_set_multicast_loopback (rtp_socket, FALSE);
1412 if (!transport_settings_defined) {
1413 if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool))
1415 GstRTSPAddressFlags flags;
1418 rejected_addresses = g_list_prepend (rejected_addresses, addr);
1423 flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1425 flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1427 flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1429 if (family == G_SOCKET_FAMILY_IPV6)
1430 flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1432 flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1434 addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1439 tmp_rtp = addr->port;
1441 g_clear_object (&inetaddr);
1442 /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and
1443 * socket control message set in udpsrc? */
1445 inetaddr = g_inet_address_new_any (family);
1447 inetaddr = g_inet_address_new_from_string (addr->address);
1455 if (inetaddr == NULL)
1456 inetaddr = g_inet_address_new_any (family);
1460 rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1461 if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1462 GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again");
1463 g_object_unref (rtp_sockaddr);
1464 if (transport_settings_defined)
1465 goto transport_settings_error;
1468 g_object_unref (rtp_sockaddr);
1470 rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1471 if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1472 g_clear_object (&rtp_sockaddr);
1476 if (!transport_settings_defined) {
1478 g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1480 /* check if port is even. RFC 3550 encorages the use of an even/odd port
1481 * pair, however it's not a strict requirement so this check is not done
1482 * for the client selected ports. */
1483 if ((tmp_rtp & 1) != 0) {
1484 /* port not even, close and allocate another */
1486 g_object_unref (rtp_sockaddr);
1487 g_clear_object (&rtp_socket);
1491 g_object_unref (rtp_sockaddr);
1494 tmp_rtcp = tmp_rtp + 1;
1496 rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1497 if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1498 GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again");
1499 g_object_unref (rtcp_sockaddr);
1500 g_clear_object (&rtp_socket);
1501 if (transport_settings_defined)
1502 goto transport_settings_error;
1505 g_object_unref (rtcp_sockaddr);
1508 addr = g_slice_new0 (GstRTSPAddress);
1509 addr->port = tmp_rtp;
1511 if (transport_settings_defined)
1512 addr->address = g_strdup (ct->destination);
1514 addr->address = g_inet_address_to_string (inetaddr);
1515 addr->ttl = ct->ttl;
1518 g_clear_object (&inetaddr);
1520 if (multicast && (ct->ttl > 0) && (ct->ttl <= priv->max_mcast_ttl)) {
1521 GST_DEBUG ("setting mcast ttl to %d", ct->ttl);
1522 g_socket_set_multicast_ttl (rtp_socket, ct->ttl);
1523 g_socket_set_multicast_ttl (rtcp_socket, ct->ttl);
1526 socket_out[0] = rtp_socket;
1527 socket_out[1] = rtcp_socket;
1528 *server_addr_out = addr;
1530 GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d",
1531 addr->address, tmp_rtp, tmp_rtcp);
1533 g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1540 GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: wrong transport");
1545 GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no transport");
1550 GST_ERROR_OBJECT (stream,
1551 "failed to allocate UDP ports: destination error");
1554 destination_no_mcast:
1556 GST_ERROR_OBJECT (stream,
1557 "failed to allocate UDP ports: destination not multicast address");
1562 GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: protocol error");
1567 GST_WARNING_OBJECT (stream,
1568 "failed to allocate UDP ports: no address pool specified");
1573 GST_WARNING_OBJECT (stream, "failed to acquire address from pool");
1578 GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: no ports");
1581 transport_settings_error:
1583 GST_ERROR_OBJECT (stream,
1584 "failed to allocate UDP ports with requested transport settings");
1589 GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: socket error");
1595 g_object_unref (inetaddr);
1596 g_list_free_full (rejected_addresses,
1597 (GDestroyNotify) gst_rtsp_address_free);
1599 gst_rtsp_address_free (addr);
1601 g_object_unref (rtp_socket);
1603 g_object_unref (rtcp_socket);
1608 /* must be called with lock */
1610 add_mcast_client_addr (GstRTSPStream * stream, const gchar * destination,
1611 guint rtp_port, guint rtcp_port)
1613 GstRTSPStreamPrivate *priv;
1615 UdpClientAddrInfo *client;
1618 priv = stream->priv;
1620 if (destination == NULL)
1623 inet = g_inet_address_new_from_string (destination);
1625 goto invalid_address;
1627 if (!g_inet_address_get_is_multicast (inet)) {
1628 g_object_unref (inet);
1629 goto invalid_address;
1631 g_object_unref (inet);
1633 for (walk = priv->mcast_clients; walk; walk = g_list_next (walk)) {
1634 UdpClientAddrInfo *cli = walk->data;
1636 if ((g_strcmp0 (cli->address, destination) == 0) &&
1637 (cli->rtp_port == rtp_port)) {
1638 GST_DEBUG ("requested destination already exists: %s:%u-%u",
1639 destination, rtp_port, rtcp_port);
1645 client = g_new0 (UdpClientAddrInfo, 1);
1646 client->address = g_strdup (destination);
1647 client->rtp_port = rtp_port;
1648 client->add_count = 1;
1649 priv->mcast_clients = g_list_prepend (priv->mcast_clients, client);
1651 GST_DEBUG ("added mcast client %s:%u-%u", destination, rtp_port, rtcp_port);
1657 GST_WARNING_OBJECT (stream, "Multicast address is invalid: %s",
1663 /* must be called with lock */
1665 remove_mcast_client_addr (GstRTSPStream * stream, const gchar * destination,
1666 guint rtp_port, guint rtcp_port)
1668 GstRTSPStreamPrivate *priv;
1671 priv = stream->priv;
1673 if (destination == NULL)
1674 goto no_destination;
1676 for (walk = priv->mcast_clients; walk; walk = g_list_next (walk)) {
1677 UdpClientAddrInfo *cli = walk->data;
1679 if ((g_strcmp0 (cli->address, destination) == 0) &&
1680 (cli->rtp_port == rtp_port)) {
1683 if (!cli->add_count) {
1684 priv->mcast_clients = g_list_remove (priv->mcast_clients, cli);
1685 free_mcast_client (cli);
1691 GST_WARNING_OBJECT (stream, "Address not found");
1696 GST_WARNING_OBJECT (stream, "No destination has been provided");
1703 * gst_rtsp_stream_allocate_udp_sockets:
1704 * @stream: a #GstRTSPStream
1705 * @family: protocol family
1706 * @transport: transport method
1707 * @use_client_settings: Whether to use client settings or not
1709 * Allocates RTP and RTCP ports.
1711 * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1714 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1715 GSocketFamily family, GstRTSPTransport * ct,
1716 gboolean use_transport_settings)
1718 GstRTSPStreamPrivate *priv;
1719 gboolean ret = FALSE;
1720 GstRTSPLowerTrans transport;
1721 gboolean allocated = FALSE;
1723 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1724 g_return_val_if_fail (ct != NULL, FALSE);
1725 priv = stream->priv;
1727 transport = ct->lower_transport;
1729 g_mutex_lock (&priv->lock);
1731 if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1732 if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_socket_v4[0])
1734 else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_socket_v6[0])
1736 } else if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1737 if (family == G_SOCKET_FAMILY_IPV4 && priv->socket_v4[0])
1739 else if (family == G_SOCKET_FAMILY_IPV6 && priv->socket_v6[0])
1744 GST_DEBUG_OBJECT (stream, "Allocated already");
1745 g_mutex_unlock (&priv->lock);
1749 if (family == G_SOCKET_FAMILY_IPV4) {
1751 if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1753 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4");
1754 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1755 priv->socket_v4, &priv->server_addr_v4, FALSE, ct, FALSE);
1758 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4");
1759 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1760 priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct,
1761 use_transport_settings);
1765 if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1767 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6");
1768 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1769 priv->socket_v6, &priv->server_addr_v6, FALSE, ct, FALSE);
1773 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6");
1774 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1775 priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct,
1776 use_transport_settings);
1779 g_mutex_unlock (&priv->lock);
1785 * gst_rtsp_stream_set_client_side:
1786 * @stream: a #GstRTSPStream
1787 * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1788 * an RTSP connection.
1790 * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1791 * streams to an RTSP server via RECORD. This has the practical effect
1792 * of changing which UDP port numbers are used when setting up the local
1793 * side of the stream sending to be either the 'server' or 'client' pair
1794 * of a configured UDP transport.
1797 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1799 GstRTSPStreamPrivate *priv;
1801 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1802 priv = stream->priv;
1803 g_mutex_lock (&priv->lock);
1804 priv->client_side = client_side;
1805 g_mutex_unlock (&priv->lock);
1809 * gst_rtsp_stream_is_client_side:
1810 * @stream: a #GstRTSPStream
1812 * See gst_rtsp_stream_set_client_side()
1814 * Returns: TRUE if this #GstRTSPStream is client-side.
1817 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1819 GstRTSPStreamPrivate *priv;
1822 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1824 priv = stream->priv;
1825 g_mutex_lock (&priv->lock);
1826 ret = priv->client_side;
1827 g_mutex_unlock (&priv->lock);
1833 * gst_rtsp_stream_get_server_port:
1834 * @stream: a #GstRTSPStream
1835 * @server_port: (out): result server port
1836 * @family: the port family to get
1838 * Fill @server_port with the port pair used by the server. This function can
1839 * only be called when @stream has been joined.
1842 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1843 GstRTSPRange * server_port, GSocketFamily family)
1845 GstRTSPStreamPrivate *priv;
1847 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1848 priv = stream->priv;
1849 g_return_if_fail (priv->joined_bin != NULL);
1852 server_port->min = 0;
1853 server_port->max = 0;
1856 g_mutex_lock (&priv->lock);
1857 if (family == G_SOCKET_FAMILY_IPV4) {
1858 if (server_port && priv->server_addr_v4) {
1859 server_port->min = priv->server_addr_v4->port;
1861 priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1864 if (server_port && priv->server_addr_v6) {
1865 server_port->min = priv->server_addr_v6->port;
1867 priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1870 g_mutex_unlock (&priv->lock);
1874 * gst_rtsp_stream_get_rtpsession:
1875 * @stream: a #GstRTSPStream
1877 * Get the RTP session of this stream.
1879 * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1882 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1884 GstRTSPStreamPrivate *priv;
1887 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1889 priv = stream->priv;
1891 g_mutex_lock (&priv->lock);
1892 if ((session = priv->session))
1893 g_object_ref (session);
1894 g_mutex_unlock (&priv->lock);
1900 * gst_rtsp_stream_get_srtp_encoder:
1901 * @stream: a #GstRTSPStream
1903 * Get the SRTP encoder for this stream.
1905 * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1908 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1910 GstRTSPStreamPrivate *priv;
1911 GstElement *encoder;
1913 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1915 priv = stream->priv;
1917 g_mutex_lock (&priv->lock);
1918 if ((encoder = priv->srtpenc))
1919 g_object_ref (encoder);
1920 g_mutex_unlock (&priv->lock);
1926 * gst_rtsp_stream_get_ssrc:
1927 * @stream: a #GstRTSPStream
1928 * @ssrc: (out): result ssrc
1930 * Get the SSRC used by the RTP session of this stream. This function can only
1931 * be called when @stream has been joined.
1934 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1936 GstRTSPStreamPrivate *priv;
1938 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1939 priv = stream->priv;
1940 g_return_if_fail (priv->joined_bin != NULL);
1942 g_mutex_lock (&priv->lock);
1943 if (ssrc && priv->session)
1944 g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1945 g_mutex_unlock (&priv->lock);
1949 * gst_rtsp_stream_set_retransmission_time:
1950 * @stream: a #GstRTSPStream
1951 * @time: a #GstClockTime
1953 * Set the amount of time to store retransmission packets.
1956 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1959 GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1961 g_mutex_lock (&stream->priv->lock);
1962 stream->priv->rtx_time = time;
1963 if (stream->priv->rtxsend)
1964 g_object_set (stream->priv->rtxsend, "max-size-time",
1965 GST_TIME_AS_MSECONDS (time), NULL);
1966 g_mutex_unlock (&stream->priv->lock);
1970 * gst_rtsp_stream_get_retransmission_time:
1971 * @stream: a #GstRTSPStream
1973 * Get the amount of time to store retransmission data.
1975 * Returns: the amount of time to store retransmission data.
1978 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1982 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1984 g_mutex_lock (&stream->priv->lock);
1985 ret = stream->priv->rtx_time;
1986 g_mutex_unlock (&stream->priv->lock);
1992 * gst_rtsp_stream_set_retransmission_pt:
1993 * @stream: a #GstRTSPStream
1996 * Set the payload type (pt) for retransmission of this stream.
1999 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
2001 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2003 GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
2005 g_mutex_lock (&stream->priv->lock);
2006 stream->priv->rtx_pt = rtx_pt;
2007 if (stream->priv->rtxsend) {
2008 guint pt = gst_rtsp_stream_get_pt (stream);
2009 gchar *pt_s = g_strdup_printf ("%d", pt);
2010 GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
2011 pt_s, G_TYPE_UINT, rtx_pt, NULL);
2012 g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
2014 gst_structure_free (rtx_pt_map);
2016 g_mutex_unlock (&stream->priv->lock);
2020 * gst_rtsp_stream_get_retransmission_pt:
2021 * @stream: a #GstRTSPStream
2023 * Get the payload-type used for retransmission of this stream
2025 * Returns: The retransmission PT.
2028 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
2032 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2034 g_mutex_lock (&stream->priv->lock);
2035 rtx_pt = stream->priv->rtx_pt;
2036 g_mutex_unlock (&stream->priv->lock);
2042 * gst_rtsp_stream_set_buffer_size:
2043 * @stream: a #GstRTSPStream
2044 * @size: the buffer size
2046 * Set the size of the UDP transmission buffer (in bytes)
2047 * Needs to be set before the stream is joined to a bin.
2052 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
2054 g_mutex_lock (&stream->priv->lock);
2055 stream->priv->buffer_size = size;
2056 g_mutex_unlock (&stream->priv->lock);
2060 * gst_rtsp_stream_get_buffer_size:
2061 * @stream: a #GstRTSPStream
2063 * Get the size of the UDP transmission buffer (in bytes)
2065 * Returns: the size of the UDP TX buffer
2070 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
2074 g_mutex_lock (&stream->priv->lock);
2075 buffer_size = stream->priv->buffer_size;
2076 g_mutex_unlock (&stream->priv->lock);
2082 * gst_rtsp_stream_set_max_mcast_ttl:
2083 * @stream: a #GstRTSPStream
2084 * @ttl: the new multicast ttl value
2086 * Set the maximum time-to-live value of outgoing multicast packets.
2088 * Returns: %TRUE if the requested ttl has been set successfully.
2092 gst_rtsp_stream_set_max_mcast_ttl (GstRTSPStream * stream, guint ttl)
2094 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2096 g_mutex_lock (&stream->priv->lock);
2097 if (ttl == 0 || ttl > DEFAULT_MAX_MCAST_TTL) {
2098 GST_WARNING_OBJECT (stream, "The reqested mcast TTL value is not valid.");
2099 g_mutex_unlock (&stream->priv->lock);
2102 stream->priv->max_mcast_ttl = ttl;
2103 g_mutex_unlock (&stream->priv->lock);
2109 * gst_rtsp_stream_get_max_mcast_ttl:
2110 * @stream: a #GstRTSPStream
2112 * Get the the maximum time-to-live value of outgoing multicast packets.
2114 * Returns: the maximum time-to-live value of outgoing multicast packets.
2118 gst_rtsp_stream_get_max_mcast_ttl (GstRTSPStream * stream)
2122 g_mutex_lock (&stream->priv->lock);
2123 ttl = stream->priv->max_mcast_ttl;
2124 g_mutex_unlock (&stream->priv->lock);
2130 * gst_rtsp_stream_verify_mcast_ttl:
2131 * @stream: a #GstRTSPStream
2132 * @ttl: a requested multicast ttl
2134 * Check if the requested multicast ttl value is allowed.
2136 * Returns: TRUE if the requested ttl value is allowed.
2140 gst_rtsp_stream_verify_mcast_ttl (GstRTSPStream * stream, guint ttl)
2142 gboolean res = FALSE;
2144 g_mutex_lock (&stream->priv->lock);
2145 if ((ttl > 0) && (ttl <= stream->priv->max_mcast_ttl))
2147 g_mutex_unlock (&stream->priv->lock);
2152 /* executed from streaming thread */
2154 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
2156 GstRTSPStreamPrivate *priv = stream->priv;
2157 GstCaps *newcaps, *oldcaps;
2159 newcaps = gst_pad_get_current_caps (pad);
2161 GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
2164 g_mutex_lock (&priv->lock);
2165 oldcaps = priv->caps;
2166 priv->caps = newcaps;
2167 g_mutex_unlock (&priv->lock);
2170 gst_caps_unref (oldcaps);
2174 dump_structure (const GstStructure * s)
2178 sstr = gst_structure_to_string (s);
2179 GST_INFO ("structure: %s", sstr);
2183 static GstRTSPStreamTransport *
2184 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
2186 GstRTSPStreamPrivate *priv = stream->priv;
2188 GstRTSPStreamTransport *result = NULL;
2193 if (rtcp_from == NULL)
2196 tmp = g_strrstr (rtcp_from, ":");
2200 port = atoi (tmp + 1);
2201 dest = g_strndup (rtcp_from, tmp - rtcp_from);
2203 g_mutex_lock (&priv->lock);
2204 GST_INFO ("finding %s:%d in %d transports", dest, port,
2205 g_list_length (priv->transports));
2207 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2208 GstRTSPStreamTransport *trans = walk->data;
2209 const GstRTSPTransport *tr;
2212 tr = gst_rtsp_stream_transport_get_transport (trans);
2214 if (priv->client_side) {
2215 /* In client side mode the 'destination' is the RTSP server, so send
2217 min = tr->server_port.min;
2218 max = tr->server_port.max;
2220 min = tr->client_port.min;
2221 max = tr->client_port.max;
2224 if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
2225 (min == port || max == port)) {
2231 g_object_ref (result);
2232 g_mutex_unlock (&priv->lock);
2239 static GstRTSPStreamTransport *
2240 check_transport (GObject * source, GstRTSPStream * stream)
2242 GstStructure *stats;
2243 GstRTSPStreamTransport *trans;
2245 /* see if we have a stream to match with the origin of the RTCP packet */
2246 trans = g_object_get_qdata (source, ssrc_stream_map_key);
2247 if (trans == NULL) {
2248 g_object_get (source, "stats", &stats, NULL);
2250 const gchar *rtcp_from;
2252 dump_structure (stats);
2254 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
2255 if ((trans = find_transport (stream, rtcp_from))) {
2256 GST_INFO ("%p: found transport %p for source %p", stream, trans,
2258 g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
2261 gst_structure_free (stats);
2269 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2271 GstRTSPStreamTransport *trans;
2273 GST_INFO ("%p: new source %p", stream, source);
2275 trans = check_transport (source, stream);
2278 GST_INFO ("%p: source %p for transport %p", stream, source, trans);
2282 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
2284 GST_INFO ("%p: new SDES %p", stream, source);
2288 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
2290 GstRTSPStreamTransport *trans;
2292 trans = check_transport (source, stream);
2295 GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
2296 gst_rtsp_stream_transport_keep_alive (trans);
2300 GstStructure *stats;
2301 g_object_get (source, "stats", &stats, NULL);
2303 dump_structure (stats);
2304 gst_structure_free (stats);
2311 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2313 GST_INFO ("%p: source %p bye", stream, source);
2317 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2319 GstRTSPStreamTransport *trans;
2321 GST_INFO ("%p: source %p bye timeout", stream, source);
2323 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2324 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2325 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2330 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2332 GstRTSPStreamTransport *trans;
2334 GST_INFO ("%p: source %p timeout", stream, source);
2336 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2337 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2338 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2343 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2345 GST_INFO ("%p: new sender source %p", stream, source);
2348 GstStructure *stats;
2349 g_object_get (source, "stats", &stats, NULL);
2351 dump_structure (stats);
2352 gst_structure_free (stats);
2359 on_sender_ssrc_active (GObject * session, GObject * source,
2360 GstRTSPStream * stream)
2364 GstStructure *stats;
2365 g_object_get (source, "stats", &stats, NULL);
2367 dump_structure (stats);
2368 gst_structure_free (stats);
2375 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2378 g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2379 g_list_free (priv->tr_cache_rtp);
2380 priv->tr_cache_rtp = NULL;
2382 g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2383 g_list_free (priv->tr_cache_rtcp);
2384 priv->tr_cache_rtcp = NULL;
2388 /* Must be called with priv->lock */
2390 send_tcp_message (GstRTSPStream * stream, gint idx)
2392 GstRTSPStreamPrivate *priv = stream->priv;
2399 if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
2403 priv->have_buffer[idx] = FALSE;
2405 if (priv->appsink[idx] == NULL) {
2406 /* session expired */
2410 sink = GST_APP_SINK (priv->appsink[idx]);
2411 sample = gst_app_sink_pull_sample (sink);
2416 buffer = gst_sample_get_buffer (sample);
2418 is_rtp = (idx == 0);
2421 if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2422 clear_tr_cache (priv, is_rtp);
2423 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2424 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2425 const GstRTSPTransport *t =
2426 gst_rtsp_stream_transport_get_transport (tr);
2428 if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
2431 priv->tr_cache_rtp =
2432 g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2434 priv->tr_cache_cookie_rtp = priv->transports_cookie;
2437 if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2438 clear_tr_cache (priv, is_rtp);
2439 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2440 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2441 const GstRTSPTransport *t =
2442 gst_rtsp_stream_transport_get_transport (tr);
2444 if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
2447 priv->tr_cache_rtcp =
2448 g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2450 priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2454 priv->n_outstanding += priv->n_tcp_transports;
2456 g_mutex_unlock (&priv->lock);
2459 for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2460 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2461 if (!gst_rtsp_stream_transport_send_rtp (tr, buffer)) {
2462 /* remove transport on send error */
2463 g_mutex_lock (&priv->lock);
2464 priv->n_outstanding--;
2465 update_transport (stream, tr, FALSE);
2466 g_mutex_unlock (&priv->lock);
2470 for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2471 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2472 if (!gst_rtsp_stream_transport_send_rtcp (tr, buffer)) {
2473 /* remove transport on send error */
2474 g_mutex_lock (&priv->lock);
2475 priv->n_outstanding--;
2476 update_transport (stream, tr, FALSE);
2477 g_mutex_unlock (&priv->lock);
2481 gst_sample_unref (sample);
2483 g_mutex_lock (&priv->lock);
2486 static GstFlowReturn
2487 handle_new_sample (GstAppSink * sink, gpointer user_data)
2489 GstRTSPStream *stream = user_data;
2490 GstRTSPStreamPrivate *priv = stream->priv;
2494 g_mutex_lock (&priv->lock);
2496 for (i = 0; i < 2; i++)
2497 if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
2498 priv->have_buffer[i] = TRUE;
2499 if (priv->n_outstanding == 0) {
2507 send_tcp_message (stream, idx);
2509 g_mutex_unlock (&priv->lock);
2514 static GstAppSinkCallbacks sink_cb = {
2515 NULL, /* not interested in EOS */
2516 NULL, /* not interested in preroll samples */
2521 get_rtp_encoder (GstRTSPStream * stream, guint session)
2523 GstRTSPStreamPrivate *priv = stream->priv;
2525 if (priv->srtpenc == NULL) {
2528 name = g_strdup_printf ("srtpenc_%u", session);
2529 priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2532 g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2534 return gst_object_ref (priv->srtpenc);
2538 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2540 GstRTSPStreamPrivate *priv = stream->priv;
2541 GstElement *oldenc, *enc;
2545 if (priv->idx != session)
2548 GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2550 oldenc = priv->srtpenc;
2551 enc = get_rtp_encoder (stream, session);
2552 name = g_strdup_printf ("rtp_sink_%d", session);
2553 pad = gst_element_get_request_pad (enc, name);
2555 gst_object_unref (pad);
2558 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2565 request_rtcp_encoder (GstElement * rtpbin, guint session,
2566 GstRTSPStream * stream)
2568 GstRTSPStreamPrivate *priv = stream->priv;
2569 GstElement *oldenc, *enc;
2573 if (priv->idx != session)
2576 GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2578 oldenc = priv->srtpenc;
2579 enc = get_rtp_encoder (stream, session);
2580 name = g_strdup_printf ("rtcp_sink_%d", session);
2581 pad = gst_element_get_request_pad (enc, name);
2583 gst_object_unref (pad);
2586 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2593 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2595 GstRTSPStreamPrivate *priv = stream->priv;
2598 GST_DEBUG ("request key %08x", ssrc);
2600 g_mutex_lock (&priv->lock);
2601 if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2602 gst_caps_ref (caps);
2603 g_mutex_unlock (&priv->lock);
2609 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2610 GstRTSPStream * stream)
2612 GstRTSPStreamPrivate *priv = stream->priv;
2614 if (priv->idx != session)
2617 if (priv->srtpdec == NULL) {
2620 name = g_strdup_printf ("srtpdec_%u", session);
2621 priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2624 g_signal_connect (priv->srtpdec, "request-key",
2625 (GCallback) request_key, stream);
2627 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_RTCP_DECODER],
2631 return gst_object_ref (priv->srtpdec);
2635 * gst_rtsp_stream_request_aux_sender:
2636 * @stream: a #GstRTSPStream
2637 * @sessid: the session id
2639 * Creating a rtxsend bin
2641 * Returns: (transfer full) (nullable): a #GstElement.
2646 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2650 GstStructure *pt_map;
2655 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2657 pt = gst_rtsp_stream_get_pt (stream);
2658 pt_s = g_strdup_printf ("%u", pt);
2659 rtx_pt = stream->priv->rtx_pt;
2661 GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2663 bin = gst_bin_new (NULL);
2664 stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2665 pt_map = gst_structure_new ("application/x-rtp-pt-map",
2666 pt_s, G_TYPE_UINT, rtx_pt, NULL);
2667 g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2668 "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2670 gst_structure_free (pt_map);
2671 gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2673 pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2674 name = g_strdup_printf ("src_%u", sessid);
2675 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2677 gst_object_unref (pad);
2679 pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2680 name = g_strdup_printf ("sink_%u", sessid);
2681 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2683 gst_object_unref (pad);
2689 add_rtx_pt (gpointer key, GstCaps * caps, GstStructure * pt_map)
2691 guint pt = GPOINTER_TO_INT (key);
2692 const GstStructure *s = gst_caps_get_structure (caps, 0);
2695 if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "RTX") &&
2696 (apt = gst_structure_get_string (s, "apt"))) {
2697 gst_structure_set (pt_map, apt, G_TYPE_UINT, pt, NULL);
2701 /* Call with priv->lock taken */
2703 update_rtx_receive_pt_map (GstRTSPStream * stream)
2705 GstStructure *pt_map;
2707 if (!stream->priv->rtxreceive)
2710 pt_map = gst_structure_new_empty ("application/x-rtp-pt-map");
2711 g_hash_table_foreach (stream->priv->ptmap, (GHFunc) add_rtx_pt, pt_map);
2712 g_object_set (stream->priv->rtxreceive, "payload-type-map", pt_map, NULL);
2713 gst_structure_free (pt_map);
2720 retrieve_ulpfec_pt (gpointer key, GstCaps * caps, GstElement * ulpfec_decoder)
2722 guint pt = GPOINTER_TO_INT (key);
2723 const GstStructure *s = gst_caps_get_structure (caps, 0);
2725 if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
2726 g_object_set (ulpfec_decoder, "pt", pt, NULL);
2730 update_ulpfec_decoder_pt (GstRTSPStream * stream)
2732 if (!stream->priv->ulpfec_decoder)
2735 g_hash_table_foreach (stream->priv->ptmap, (GHFunc) retrieve_ulpfec_pt,
2736 stream->priv->ulpfec_decoder);
2743 * gst_rtsp_stream_request_aux_receiver:
2744 * @stream: a #GstRTSPStream
2745 * @sessid: the session id
2747 * Creating a rtxreceive bin
2749 * Returns: (transfer full) (nullable): a #GstElement.
2754 gst_rtsp_stream_request_aux_receiver (GstRTSPStream * stream, guint sessid)
2760 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2762 bin = gst_bin_new (NULL);
2763 stream->priv->rtxreceive = gst_element_factory_make ("rtprtxreceive", NULL);
2764 update_rtx_receive_pt_map (stream);
2765 update_ulpfec_decoder_pt (stream);
2766 gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxreceive));
2768 pad = gst_element_get_static_pad (stream->priv->rtxreceive, "src");
2769 name = g_strdup_printf ("src_%u", sessid);
2770 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2772 gst_object_unref (pad);
2774 pad = gst_element_get_static_pad (stream->priv->rtxreceive, "sink");
2775 name = g_strdup_printf ("sink_%u", sessid);
2776 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2778 gst_object_unref (pad);
2784 * gst_rtsp_stream_set_pt_map:
2785 * @stream: a #GstRTSPStream
2789 * Configure a pt map between @pt and @caps.
2792 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2794 GstRTSPStreamPrivate *priv = stream->priv;
2796 if (!GST_IS_CAPS (caps))
2799 g_mutex_lock (&priv->lock);
2800 g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2801 update_rtx_receive_pt_map (stream);
2802 g_mutex_unlock (&priv->lock);
2806 * gst_rtsp_stream_set_publish_clock_mode:
2807 * @stream: a #GstRTSPStream
2808 * @mode: the clock publish mode
2810 * Sets if and how the stream clock should be published according to RFC7273.
2815 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2816 GstRTSPPublishClockMode mode)
2818 GstRTSPStreamPrivate *priv;
2820 priv = stream->priv;
2821 g_mutex_lock (&priv->lock);
2822 priv->publish_clock_mode = mode;
2823 g_mutex_unlock (&priv->lock);
2827 * gst_rtsp_stream_get_publish_clock_mode:
2828 * @stream: a #GstRTSPStream
2830 * Gets if and how the stream clock should be published according to RFC7273.
2832 * Returns: The GstRTSPPublishClockMode
2836 GstRTSPPublishClockMode
2837 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2839 GstRTSPStreamPrivate *priv;
2840 GstRTSPPublishClockMode ret;
2842 priv = stream->priv;
2843 g_mutex_lock (&priv->lock);
2844 ret = priv->publish_clock_mode;
2845 g_mutex_unlock (&priv->lock);
2851 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2852 GstRTSPStream * stream)
2854 GstRTSPStreamPrivate *priv = stream->priv;
2855 GstCaps *caps = NULL;
2857 g_mutex_lock (&priv->lock);
2859 if (priv->idx == session) {
2860 caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2862 GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2863 gst_caps_ref (caps);
2865 GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2869 g_mutex_unlock (&priv->lock);
2875 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2877 GstRTSPStreamPrivate *priv = stream->priv;
2879 GstPadLinkReturn ret;
2882 GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2883 GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2885 name = gst_pad_get_name (pad);
2886 if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2892 if (priv->idx != sessid)
2895 if (gst_pad_is_linked (priv->sinkpad)) {
2896 GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2897 GST_DEBUG_PAD_NAME (priv->sinkpad));
2901 /* link the RTP pad to the session manager, it should not really fail unless
2902 * this is not really an RTP pad */
2903 ret = gst_pad_link (pad, priv->sinkpad);
2904 if (ret != GST_PAD_LINK_OK)
2906 priv->recv_rtp_src = gst_object_ref (pad);
2913 GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2914 GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2919 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2920 GstRTSPStream * stream)
2922 /* TODO: What to do here other than this? */
2923 GST_DEBUG ("Stream %p: Got EOS", stream);
2924 gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2927 typedef struct _ProbeData ProbeData;
2931 GstRTSPStream *stream;
2932 /* existing sink, already linked to tee */
2934 /* new sink, about to be linked */
2936 /* new queue element, that will be linked to tee and sink1 */
2937 GstElement **queue1;
2938 /* new queue element, that will be linked to tee and sink2 */
2939 GstElement **queue2;
2946 free_cb_data (gpointer user_data)
2948 ProbeData *data = user_data;
2950 gst_object_unref (data->stream);
2951 gst_object_unref (data->sink1);
2952 gst_object_unref (data->sink2);
2953 gst_object_unref (data->sink_pad);
2954 gst_object_unref (data->tee_pad);
2960 create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream,
2961 GstElement * tee, GstElement * sink, GstElement ** queue)
2963 GstRTSPStreamPrivate *priv = stream->priv;
2968 /* create queue for the new stream */
2969 *queue = gst_element_factory_make ("queue", NULL);
2970 g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0,
2971 "max-size-time", G_GINT64_CONSTANT (0), NULL);
2972 gst_bin_add (priv->joined_bin, *queue);
2974 /* link tee to queue */
2975 tee_pad = gst_element_get_request_pad (tee, "src_%u");
2976 queue_pad = gst_element_get_static_pad (*queue, "sink");
2977 gst_pad_link (tee_pad, queue_pad);
2978 gst_object_unref (queue_pad);
2979 gst_object_unref (tee_pad);
2981 /* link queue to sink */
2982 queue_pad = gst_element_get_static_pad (*queue, "src");
2983 sink_pad = gst_element_get_static_pad (sink, "sink");
2984 gst_pad_link (queue_pad, sink_pad);
2985 gst_object_unref (queue_pad);
2986 gst_object_unref (sink_pad);
2988 gst_element_sync_state_with_parent (sink);
2989 gst_element_sync_state_with_parent (*queue);
2992 static GstPadProbeReturn
2993 create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad,
2994 GstPadProbeInfo * info, gpointer user_data)
2996 GstRTSPStreamPrivate *priv;
2997 ProbeData *data = user_data;
2998 GstRTSPStream *stream;
2999 GstElement **queue1;
3000 GstElement **queue2;
3006 stream = data->stream;
3007 priv = stream->priv;
3008 queue1 = data->queue1;
3009 queue2 = data->queue2;
3010 sink_pad = data->sink_pad;
3011 tee_pad = data->tee_pad;
3012 index = data->index;
3014 /* unlink tee and the existing sink:
3015 * .-----. .---------.
3018 * '-----' '---------'
3020 g_assert (gst_pad_unlink (tee_pad, sink_pad));
3022 /* add queue to the already existing stream */
3023 *queue1 = gst_element_factory_make ("queue", NULL);
3024 g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0,
3025 "max-size-time", G_GINT64_CONSTANT (0), NULL);
3026 gst_bin_add (priv->joined_bin, *queue1);
3028 /* link tee, queue and sink:
3029 * .-----. .---------. .---------.
3030 * | tee | | queue1 | | sink1 |
3031 * sink src->sink src->sink |
3032 * '-----' '---------' '---------'
3034 queue_pad = gst_element_get_static_pad (*queue1, "sink");
3035 gst_pad_link (tee_pad, queue_pad);
3036 gst_object_unref (queue_pad);
3037 queue_pad = gst_element_get_static_pad (*queue1, "src");
3038 gst_pad_link (queue_pad, sink_pad);
3039 gst_object_unref (queue_pad);
3041 gst_element_sync_state_with_parent (*queue1);
3043 /* create queue and link it to tee and the new sink */
3044 create_and_plug_queue_to_unlinked_stream (stream,
3045 priv->tee[index], data->sink2, queue2);
3047 /* the final stream:
3049 * .-----. .---------. .---------.
3050 * | tee | | queue1 | | sink1 |
3051 * sink src->sink src->sink |
3052 * | | '---------' '---------'
3053 * | | .---------. .---------.
3054 * | | | queue2 | | sink2 |
3055 * | src->sink src->sink |
3056 * '-----' '---------' '---------'
3059 return GST_PAD_PROBE_REMOVE;
3063 create_and_plug_queue_to_linked_stream (GstRTSPStream * stream,
3064 GstElement * sink1, GstElement * sink2, guint index, GstElement ** queue1,
3065 GstElement ** queue2)
3069 data = g_new0 (ProbeData, 1);
3070 data->stream = gst_object_ref (stream);
3071 data->sink1 = gst_object_ref (sink1);
3072 data->sink2 = gst_object_ref (sink2);
3073 data->queue1 = queue1;
3074 data->queue2 = queue2;
3075 data->index = index;
3077 data->sink_pad = gst_element_get_static_pad (sink1, "sink");
3078 g_assert (data->sink_pad);
3079 data->tee_pad = gst_pad_get_peer (data->sink_pad);
3080 g_assert (data->tee_pad);
3082 gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE,
3083 create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data);
3087 plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug,
3088 GstElement ** queue_to_plug, guint index, gboolean is_mcast)
3090 GstRTSPStreamPrivate *priv = stream->priv;
3091 GstElement *existing_sink;
3094 existing_sink = priv->udpsink[index];
3096 existing_sink = priv->mcast_udpsink[index];
3098 GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp");
3100 /* add sink to the bin */
3101 gst_bin_add (priv->joined_bin, sink_to_plug);
3103 if (priv->appsink[index] && existing_sink) {
3105 /* queues are already added for the existing stream, add one for
3106 the newly added udp stream */
3107 create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index],
3108 sink_to_plug, queue_to_plug);
3110 } else if (priv->appsink[index] || existing_sink) {
3112 GstElement *element;
3114 /* add queue to the already existing stream plus the newly created udp
3116 if (priv->appsink[index]) {
3117 element = priv->appsink[index];
3118 queue = &priv->appqueue[index];
3120 element = existing_sink;
3122 queue = &priv->udpqueue[index];
3124 queue = &priv->mcast_udpqueue[index];
3127 create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug,
3128 index, queue, queue_to_plug);
3134 GST_DEBUG_OBJECT (stream, "creating first stream");
3136 /* no need to add queues */
3137 tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
3138 sink_pad = gst_element_get_static_pad (sink_to_plug, "sink");
3139 gst_pad_link (tee_pad, sink_pad);
3140 gst_object_unref (tee_pad);
3141 gst_object_unref (sink_pad);
3144 gst_element_sync_state_with_parent (sink_to_plug);
3148 plug_tcp_sink (GstRTSPStream * stream, guint index)
3150 GstRTSPStreamPrivate *priv = stream->priv;
3152 GST_DEBUG_OBJECT (stream, "plug tcp sink");
3154 /* add sink to the bin */
3155 gst_bin_add (priv->joined_bin, priv->appsink[index]);
3157 if (priv->mcast_udpsink[index] && priv->udpsink[index]) {
3159 /* queues are already added for the existing stream, add one for
3160 the newly added tcp stream */
3161 create_and_plug_queue_to_unlinked_stream (stream,
3162 priv->tee[index], priv->appsink[index], &priv->appqueue[index]);
3164 } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) {
3166 GstElement *element;
3168 /* add queue to the already existing stream plus the newly created tcp
3170 if (priv->mcast_udpsink[index]) {
3171 element = priv->mcast_udpsink[index];
3172 queue = &priv->mcast_udpqueue[index];
3174 element = priv->udpsink[index];
3175 queue = &priv->udpqueue[index];
3178 create_and_plug_queue_to_linked_stream (stream, element,
3179 priv->appsink[index], index, queue, &priv->appqueue[index]);
3185 /* no need to add queues */
3186 tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
3187 sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink");
3188 gst_pad_link (tee_pad, sink_pad);
3189 gst_object_unref (tee_pad);
3190 gst_object_unref (sink_pad);
3193 gst_element_sync_state_with_parent (priv->appsink[index]);
3197 plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport,
3200 GstRTSPStreamPrivate *priv;
3201 gboolean is_tcp, is_udp, is_mcast;
3202 priv = stream->priv;
3204 is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3205 is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3206 is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3209 plug_udp_sink (stream, priv->udpsink[index],
3210 &priv->udpqueue[index], index, FALSE);
3213 plug_udp_sink (stream, priv->mcast_udpsink[index],
3214 &priv->mcast_udpqueue[index], index, TRUE);
3217 plug_tcp_sink (stream, index);
3220 /* must be called with lock */
3222 create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
3224 GstRTSPStreamPrivate *priv;
3227 gboolean is_tcp, is_udp, is_mcast;
3231 GST_DEBUG_OBJECT (stream, "create sender part");
3232 priv = stream->priv;
3233 bin = priv->joined_bin;
3235 is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3236 is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3237 is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3240 mcast_ttl = transport->ttl;
3242 GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d (ttl: %d)", is_tcp,
3243 is_udp, is_mcast, mcast_ttl);
3245 if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) {
3246 GST_WARNING_OBJECT (stream, "no sockets assigned for UDP");
3250 if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) {
3251 GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast");
3255 for (i = 0; i < 2; i++) {
3256 gboolean link_tee = FALSE;
3257 /* For the sender we create this bit of pipeline for both
3259 * Initially there will be only one active transport for
3260 * the stream, so the pipeline will look like this:
3262 * .--------. .-----. .---------.
3263 * | rtpbin | | tee | | sink |
3264 * | send->sink src->sink |
3265 * '--------' '-----' '---------'
3267 * For each new transport, the already existing branch will
3268 * be reconfigured by adding a queue element:
3270 * .--------. .-----. .---------. .---------.
3271 * | rtpbin | | tee | | queue | | udpsink |
3272 * | send->sink src->sink src->sink |
3273 * '--------' | | '---------' '---------'
3274 * | | .---------. .---------.
3275 * | | | queue | | udpsink |
3276 * | src->sink src->sink |
3277 * | | '---------' '---------'
3278 * | | .---------. .---------.
3279 * | | | queue | | appsink |
3280 * | src->sink src->sink |
3281 * '-----' '---------' '---------'
3284 /* Only link the RTP send src if we're going to send RTP, link
3285 * the RTCP send src always */
3286 if (!priv->srcpad && i == 0)
3289 if (!priv->tee[i]) {
3290 /* make tee for RTP/RTCP */
3291 priv->tee[i] = gst_element_factory_make ("tee", NULL);
3292 gst_bin_add (bin, priv->tee[i]);
3296 if (is_udp && !priv->udpsink[i]) {
3297 /* we create only one pair of udpsinks for IPv4 and IPv6 */
3298 create_and_configure_udpsink (stream, &priv->udpsink[i],
3299 priv->socket_v4[i], priv->socket_v6[i], FALSE, (i == 0), mcast_ttl);
3300 plug_sink (stream, transport, i);
3301 } else if (is_mcast && !priv->mcast_udpsink[i]) {
3302 /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */
3303 create_and_configure_udpsink (stream, &priv->mcast_udpsink[i],
3304 priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0),
3306 plug_sink (stream, transport, i);
3307 } else if (is_tcp && !priv->appsink[i]) {
3309 priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
3310 g_object_set (priv->appsink[i], "emit-signals", FALSE, "max-buffers", 1,
3313 /* we need to set sync and preroll to FALSE for the sink to avoid
3314 * deadlock. This is only needed for sink sending RTCP data. */
3316 g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
3318 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
3319 &sink_cb, stream, NULL);
3320 plug_sink (stream, transport, i);
3324 /* and link to rtpbin send pad */
3325 gst_element_sync_state_with_parent (priv->tee[i]);
3326 pad = gst_element_get_static_pad (priv->tee[i], "sink");
3327 gst_pad_link (priv->send_src[i], pad);
3328 gst_object_unref (pad);
3335 /* must be called with lock */
3337 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
3338 GstElement * funnel)
3340 GstRTSPStreamPrivate *priv;
3341 GstPad *pad, *selpad;
3344 priv = stream->priv;
3346 pad = gst_element_get_static_pad (src, "src");
3348 /* block pad so src can't push data while it's not yet linked */
3349 id = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK |
3350 GST_PAD_PROBE_TYPE_BUFFER, NULL, NULL, NULL);
3351 /* we set and keep these to playing so that they don't cause NO_PREROLL return
3352 * values. This is only relevant for PLAY pipelines */
3353 gst_element_set_state (src, GST_STATE_PLAYING);
3354 gst_element_set_locked_state (src, TRUE);
3358 gst_bin_add (bin, src);
3360 /* and link to the funnel */
3361 selpad = gst_element_get_request_pad (funnel, "sink_%u");
3362 gst_pad_link (pad, selpad);
3364 gst_pad_remove_probe (pad, id);
3365 gst_object_unref (pad);
3366 gst_object_unref (selpad);
3369 /* must be called with lock */
3371 create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport *
3374 GstRTSPStreamPrivate *priv;
3382 GST_DEBUG_OBJECT (stream, "create receiver part");
3383 priv = stream->priv;
3384 bin = priv->joined_bin;
3386 tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3387 udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3388 mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3390 for (i = 0; i < 2; i++) {
3391 /* For the receiver we create this bit of pipeline for both
3392 * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
3393 * and it is all funneled into the rtpbin receive pad.
3396 * .--------. .--------. .--------.
3397 * | udpsrc | | funnel | | rtpbin |
3398 * | RTP src->sink src->sink |
3399 * '--------' | | | |
3400 * .--------. | | | |
3401 * | appsrc | | | | |
3402 * | RTP src->sink | | |
3403 * '--------' '--------' | |
3405 * .--------. .--------. | |
3406 * | udpsrc | | funnel | | |
3407 * | RTCP src->sink src->sink |
3408 * '--------' | | '--------'
3411 * | RTCP src->sink |
3412 * '--------' '--------'
3415 if (!priv->sinkpad && i == 0) {
3416 /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
3417 * RTCP sink always */
3421 /* make funnel for the RTP/RTCP receivers */
3422 if (!priv->funnel[i]) {
3423 priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
3424 gst_bin_add (bin, priv->funnel[i]);
3426 pad = gst_element_get_static_pad (priv->funnel[i], "src");
3427 gst_pad_link (pad, priv->recv_sink[i]);
3428 gst_object_unref (pad);
3431 if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) {
3432 GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources");
3433 if (!create_and_configure_udpsource (&priv->udpsrc_v4[i],
3434 priv->socket_v4[i]))
3437 plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
3440 if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) {
3441 GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources");
3442 if (!create_and_configure_udpsource (&priv->udpsrc_v6[i],
3443 priv->socket_v6[i]))
3446 plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
3449 if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) {
3450 GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources");
3451 if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i],
3452 priv->mcast_socket_v4[i]))
3453 goto mcast_udpsrc_error;
3454 plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
3457 if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) {
3458 GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources");
3459 if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i],
3460 priv->mcast_socket_v6[i]))
3461 goto mcast_udpsrc_error;
3462 plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
3465 if (tcp && !priv->appsrc[i]) {
3466 /* make and add appsrc */
3467 priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
3468 priv->appsrc_base_time[i] = -1;
3469 g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
3471 plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
3474 gst_element_sync_state_with_parent (priv->funnel[i]);
3485 check_mcast_client_addr (GstRTSPStream * stream, const GstRTSPTransport * tr)
3487 GstRTSPStreamPrivate *priv = stream->priv;
3490 if (priv->mcast_clients == NULL)
3496 if (tr->destination == NULL)
3497 goto no_destination;
3499 for (walk = priv->mcast_clients; walk; walk = g_list_next (walk)) {
3500 UdpClientAddrInfo *cli = walk->data;
3502 if ((g_strcmp0 (cli->address, tr->destination) == 0) &&
3503 (cli->rtp_port == tr->port.min))
3511 GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
3512 "has been reserved");
3517 GST_WARNING_OBJECT (stream, "Adding mcast transport, but no transport "
3518 "has been provided");
3523 GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
3524 "the reserved address");
3530 * gst_rtsp_stream_join_bin:
3531 * @stream: a #GstRTSPStream
3532 * @bin: (transfer none): a #GstBin to join
3533 * @rtpbin: (transfer none): a rtpbin element in @bin
3534 * @state: the target state of the new elements
3536 * Join the #GstBin @bin that contains the element @rtpbin.
3538 * @stream will link to @rtpbin, which must be inside @bin. The elements
3539 * added to @bin will be set to the state given in @state.
3541 * Returns: %TRUE on success.
3544 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
3545 GstElement * rtpbin, GstState state)
3547 GstRTSPStreamPrivate *priv;
3550 GstPadLinkReturn ret;
3552 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3553 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3554 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3556 priv = stream->priv;
3558 g_mutex_lock (&priv->lock);
3559 if (priv->joined_bin != NULL)
3562 /* create a session with the same index as the stream */
3565 GST_INFO ("stream %p joining bin as session %u", stream, idx);
3567 if (priv->profiles & GST_RTSP_PROFILE_SAVP
3568 || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
3570 g_signal_connect (rtpbin, "request-rtp-encoder",
3571 (GCallback) request_rtp_encoder, stream);
3572 g_signal_connect (rtpbin, "request-rtcp-encoder",
3573 (GCallback) request_rtcp_encoder, stream);
3574 g_signal_connect (rtpbin, "request-rtp-decoder",
3575 (GCallback) request_rtp_rtcp_decoder, stream);
3576 g_signal_connect (rtpbin, "request-rtcp-decoder",
3577 (GCallback) request_rtp_rtcp_decoder, stream);
3580 if (priv->sinkpad) {
3581 g_signal_connect (rtpbin, "request-pt-map",
3582 (GCallback) request_pt_map, stream);
3585 /* get pads from the RTP session element for sending and receiving
3588 /* get a pad for sending RTP */
3589 name = g_strdup_printf ("send_rtp_sink_%u", idx);
3590 priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
3593 /* link the RTP pad to the session manager, it should not really fail unless
3594 * this is not really an RTP pad */
3595 ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
3596 if (ret != GST_PAD_LINK_OK)
3599 name = g_strdup_printf ("send_rtp_src_%u", idx);
3600 priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
3603 /* RECORD case: need to connect our sinkpad from here */
3604 g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
3606 g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
3608 name = g_strdup_printf ("recv_rtp_sink_%u", idx);
3609 priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
3613 name = g_strdup_printf ("send_rtcp_src_%u", idx);
3614 priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
3616 name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
3617 priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
3620 /* get the session */
3621 g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
3623 g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
3625 g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
3627 g_signal_connect (priv->session, "on-ssrc-active",
3628 (GCallback) on_ssrc_active, stream);
3629 g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
3631 g_signal_connect (priv->session, "on-bye-timeout",
3632 (GCallback) on_bye_timeout, stream);
3633 g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
3636 /* signal for sender ssrc */
3637 g_signal_connect (priv->session, "on-new-sender-ssrc",
3638 (GCallback) on_new_sender_ssrc, stream);
3639 g_signal_connect (priv->session, "on-sender-ssrc-active",
3640 (GCallback) on_sender_ssrc_active, stream);
3643 /* be notified of caps changes */
3644 priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
3645 (GCallback) caps_notify, stream);
3646 priv->caps = gst_pad_get_current_caps (priv->send_src[0]);
3649 priv->joined_bin = bin;
3650 GST_DEBUG_OBJECT (stream, "successfully joined bin");
3651 g_mutex_unlock (&priv->lock);
3658 g_mutex_unlock (&priv->lock);
3663 GST_WARNING ("failed to link stream %u", idx);
3664 gst_object_unref (priv->send_rtp_sink);
3665 priv->send_rtp_sink = NULL;
3666 g_mutex_unlock (&priv->lock);
3672 clear_element (GstBin * bin, GstElement ** elementptr)
3675 gst_element_set_locked_state (*elementptr, FALSE);
3676 gst_element_set_state (*elementptr, GST_STATE_NULL);
3677 if (GST_ELEMENT_PARENT (*elementptr))
3678 gst_bin_remove (bin, *elementptr);
3680 gst_object_unref (*elementptr);
3686 * gst_rtsp_stream_leave_bin:
3687 * @stream: a #GstRTSPStream
3688 * @bin: (transfer none): a #GstBin
3689 * @rtpbin: (transfer none): a rtpbin #GstElement
3691 * Remove the elements of @stream from @bin.
3693 * Return: %TRUE on success.
3696 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
3697 GstElement * rtpbin)
3699 GstRTSPStreamPrivate *priv;
3702 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3703 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3704 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3706 priv = stream->priv;
3708 g_mutex_lock (&priv->lock);
3709 if (priv->joined_bin == NULL)
3710 goto was_not_joined;
3711 if (priv->joined_bin != bin)
3714 priv->joined_bin = NULL;
3716 /* all transports must be removed by now */
3717 if (priv->transports != NULL)
3718 goto transports_not_removed;
3720 clear_tr_cache (priv, TRUE);
3721 clear_tr_cache (priv, FALSE);
3723 GST_INFO ("stream %p leaving bin", stream);
3726 gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
3728 g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
3729 gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
3730 gst_object_unref (priv->send_rtp_sink);
3731 priv->send_rtp_sink = NULL;
3732 } else if (priv->recv_rtp_src) {
3733 gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
3734 gst_object_unref (priv->recv_rtp_src);
3735 priv->recv_rtp_src = NULL;
3738 for (i = 0; i < 2; i++) {
3739 clear_element (bin, &priv->udpsrc_v4[i]);
3740 clear_element (bin, &priv->udpsrc_v6[i]);
3741 clear_element (bin, &priv->udpqueue[i]);
3742 clear_element (bin, &priv->udpsink[i]);
3744 clear_element (bin, &priv->mcast_udpsrc_v4[i]);
3745 clear_element (bin, &priv->mcast_udpsrc_v6[i]);
3746 clear_element (bin, &priv->mcast_udpqueue[i]);
3747 clear_element (bin, &priv->mcast_udpsink[i]);
3749 clear_element (bin, &priv->appsrc[i]);
3750 clear_element (bin, &priv->appqueue[i]);
3751 clear_element (bin, &priv->appsink[i]);
3753 clear_element (bin, &priv->tee[i]);
3754 clear_element (bin, &priv->funnel[i]);
3756 if (priv->sinkpad || i == 1) {
3757 gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3758 gst_object_unref (priv->recv_sink[i]);
3759 priv->recv_sink[i] = NULL;
3764 gst_object_unref (priv->send_src[0]);
3765 priv->send_src[0] = NULL;
3768 gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3769 gst_object_unref (priv->send_src[1]);
3770 priv->send_src[1] = NULL;
3772 g_object_unref (priv->session);
3773 priv->session = NULL;
3775 gst_caps_unref (priv->caps);
3779 gst_object_unref (priv->srtpenc);
3781 gst_object_unref (priv->srtpdec);
3783 if (priv->mcast_addr_v4)
3784 gst_rtsp_address_free (priv->mcast_addr_v4);
3785 priv->mcast_addr_v4 = NULL;
3786 if (priv->mcast_addr_v6)
3787 gst_rtsp_address_free (priv->mcast_addr_v6);
3788 priv->mcast_addr_v6 = NULL;
3789 if (priv->server_addr_v4)
3790 gst_rtsp_address_free (priv->server_addr_v4);
3791 priv->server_addr_v4 = NULL;
3792 if (priv->server_addr_v6)
3793 gst_rtsp_address_free (priv->server_addr_v6);
3794 priv->server_addr_v6 = NULL;
3796 g_mutex_unlock (&priv->lock);
3802 g_mutex_unlock (&priv->lock);
3805 transports_not_removed:
3807 GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3808 g_mutex_unlock (&priv->lock);
3813 GST_ERROR_OBJECT (stream, "leaving the wrong bin");
3814 g_mutex_unlock (&priv->lock);
3820 * gst_rtsp_stream_get_joined_bin:
3821 * @stream: a #GstRTSPStream
3823 * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3825 * Return: (transfer full) (nullable): the joined bin or NULL.
3828 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3830 GstRTSPStreamPrivate *priv;
3833 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3835 priv = stream->priv;
3837 g_mutex_lock (&priv->lock);
3838 bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3839 g_mutex_unlock (&priv->lock);
3845 * gst_rtsp_stream_get_rtpinfo:
3846 * @stream: a #GstRTSPStream
3847 * @rtptime: (allow-none) (out caller-allocates): result RTP timestamp
3848 * @seq: (allow-none) (out caller-allocates): result RTP seqnum
3849 * @clock_rate: (allow-none) (out caller-allocates): the clock rate
3850 * @running_time: (out caller-allocates): result running-time
3852 * Retrieve the current rtptime, seq and running-time. This is used to
3853 * construct a RTPInfo reply header.
3855 * Returns: %TRUE when rtptime, seq and running-time could be determined.
3858 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3859 guint * rtptime, guint * seq, guint * clock_rate,
3860 GstClockTime * running_time)
3862 GstRTSPStreamPrivate *priv;
3863 GstStructure *stats;
3864 GObjectClass *payobjclass;
3866 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3868 priv = stream->priv;
3870 payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3872 g_mutex_lock (&priv->lock);
3874 /* First try to extract the information from the last buffer on the sinks.
3875 * This will have a more accurate sequence number and timestamp, as between
3876 * the payloader and the sink there can be some queues
3878 if (priv->udpsink[0] || priv->appsink[0]) {
3879 GstSample *last_sample;
3881 if (priv->udpsink[0])
3882 g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3884 g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3889 GstSegment *segment;
3891 GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3893 caps = gst_sample_get_caps (last_sample);
3894 buffer = gst_sample_get_buffer (last_sample);
3895 segment = gst_sample_get_segment (last_sample);
3896 s = gst_caps_get_structure (caps, 0);
3898 if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3899 guint ssrc_buf = gst_rtp_buffer_get_ssrc (&rtp_buffer);
3900 guint ssrc_stream = 0;
3901 if (gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) &&
3902 gst_structure_get_uint (s, "ssrc", &ssrc_stream) &&
3903 ssrc_buf != ssrc_stream) {
3904 /* Skip buffers from auxiliary streams. */
3905 GST_DEBUG_OBJECT (stream,
3906 "not a buffer from the payloader, SSRC: %08x", ssrc_buf);
3908 gst_rtp_buffer_unmap (&rtp_buffer);
3909 gst_sample_unref (last_sample);
3914 *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3918 *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3921 gst_rtp_buffer_unmap (&rtp_buffer);
3925 gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3926 GST_BUFFER_TIMESTAMP (buffer));
3930 gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3932 if (*clock_rate == 0 && running_time)
3933 *running_time = GST_CLOCK_TIME_NONE;
3935 gst_sample_unref (last_sample);
3939 gst_sample_unref (last_sample);
3945 if (g_object_class_find_property (payobjclass, "stats")) {
3946 g_object_get (priv->payloader, "stats", &stats, NULL);
3951 gst_structure_get_uint (stats, "seqnum", seq);
3954 gst_structure_get_uint (stats, "timestamp", rtptime);
3957 gst_structure_get_clock_time (stats, "running-time", running_time);
3960 gst_structure_get_uint (stats, "clock-rate", clock_rate);
3961 if (*clock_rate == 0 && running_time)
3962 *running_time = GST_CLOCK_TIME_NONE;
3964 gst_structure_free (stats);
3966 if (!g_object_class_find_property (payobjclass, "seqnum") ||
3967 !g_object_class_find_property (payobjclass, "timestamp"))
3971 g_object_get (priv->payloader, "seqnum", seq, NULL);
3974 g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3977 *running_time = GST_CLOCK_TIME_NONE;
3981 g_mutex_unlock (&priv->lock);
3988 GST_WARNING ("Could not get payloader stats");
3989 g_mutex_unlock (&priv->lock);
3995 * gst_rtsp_stream_get_caps:
3996 * @stream: a #GstRTSPStream
3998 * Retrieve the current caps of @stream.
4000 * Returns: (transfer full) (nullable): the #GstCaps of @stream.
4001 * use gst_caps_unref() after usage.
4004 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
4006 GstRTSPStreamPrivate *priv;
4009 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4011 priv = stream->priv;
4013 g_mutex_lock (&priv->lock);
4014 if ((result = priv->caps))
4015 gst_caps_ref (result);
4016 g_mutex_unlock (&priv->lock);
4022 * gst_rtsp_stream_recv_rtp:
4023 * @stream: a #GstRTSPStream
4024 * @buffer: (transfer full): a #GstBuffer
4026 * Handle an RTP buffer for the stream. This method is usually called when a
4027 * message has been received from a client using the TCP transport.
4029 * This function takes ownership of @buffer.
4031 * Returns: a GstFlowReturn.
4034 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
4036 GstRTSPStreamPrivate *priv;
4038 GstElement *element;
4040 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
4041 priv = stream->priv;
4042 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
4043 g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4045 g_mutex_lock (&priv->lock);
4046 if (priv->appsrc[0])
4047 element = gst_object_ref (priv->appsrc[0]);
4050 g_mutex_unlock (&priv->lock);
4053 if (priv->appsrc_base_time[0] == -1) {
4054 /* Take current running_time. This timestamp will be put on
4055 * the first buffer of each stream because we are a live source and so we
4056 * timestamp with the running_time. When we are dealing with TCP, we also
4057 * only timestamp the first buffer (using the DISCONT flag) because a server
4058 * typically bursts data, for which we don't want to compensate by speeding
4059 * up the media. The other timestamps will be interpollated from this one
4060 * using the RTP timestamps. */
4061 GST_OBJECT_LOCK (element);
4062 if (GST_ELEMENT_CLOCK (element)) {
4064 GstClockTime base_time;
4066 now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
4067 base_time = GST_ELEMENT_CAST (element)->base_time;
4069 priv->appsrc_base_time[0] = now - base_time;
4070 GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
4071 GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
4072 ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
4073 GST_TIME_ARGS (base_time));
4075 GST_OBJECT_UNLOCK (element);
4078 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
4079 gst_object_unref (element);
4087 * gst_rtsp_stream_recv_rtcp:
4088 * @stream: a #GstRTSPStream
4089 * @buffer: (transfer full): a #GstBuffer
4091 * Handle an RTCP buffer for the stream. This method is usually called when a
4092 * message has been received from a client using the TCP transport.
4094 * This function takes ownership of @buffer.
4096 * Returns: a GstFlowReturn.
4099 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
4101 GstRTSPStreamPrivate *priv;
4103 GstElement *element;
4105 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
4106 priv = stream->priv;
4107 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
4109 if (priv->joined_bin == NULL) {
4110 gst_buffer_unref (buffer);
4111 return GST_FLOW_NOT_LINKED;
4113 g_mutex_lock (&priv->lock);
4114 if (priv->appsrc[1])
4115 element = gst_object_ref (priv->appsrc[1]);
4118 g_mutex_unlock (&priv->lock);
4121 if (priv->appsrc_base_time[1] == -1) {
4122 /* Take current running_time. This timestamp will be put on
4123 * the first buffer of each stream because we are a live source and so we
4124 * timestamp with the running_time. When we are dealing with TCP, we also
4125 * only timestamp the first buffer (using the DISCONT flag) because a server
4126 * typically bursts data, for which we don't want to compensate by speeding
4127 * up the media. The other timestamps will be interpollated from this one
4128 * using the RTP timestamps. */
4129 GST_OBJECT_LOCK (element);
4130 if (GST_ELEMENT_CLOCK (element)) {
4132 GstClockTime base_time;
4134 now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
4135 base_time = GST_ELEMENT_CAST (element)->base_time;
4137 priv->appsrc_base_time[1] = now - base_time;
4138 GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
4139 GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
4140 ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
4141 GST_TIME_ARGS (base_time));
4143 GST_OBJECT_UNLOCK (element);
4146 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
4147 gst_object_unref (element);
4150 gst_buffer_unref (buffer);
4155 /* must be called with lock */
4157 add_client (GstElement * rtp_sink, GstElement * rtcp_sink, const gchar * host,
4158 gint rtp_port, gint rtcp_port)
4160 if (rtp_sink != NULL)
4161 g_signal_emit_by_name (rtp_sink, "add", host, rtp_port, NULL);
4162 if (rtcp_sink != NULL)
4163 g_signal_emit_by_name (rtcp_sink, "add", host, rtcp_port, NULL);
4166 /* must be called with lock */
4168 remove_client (GstElement * rtp_sink, GstElement * rtcp_sink,
4169 const gchar * host, gint rtp_port, gint rtcp_port)
4171 if (rtp_sink != NULL)
4172 g_signal_emit_by_name (rtp_sink, "remove", host, rtp_port, NULL);
4173 if (rtcp_sink != NULL)
4174 g_signal_emit_by_name (rtcp_sink, "remove", host, rtcp_port, NULL);
4177 /* must be called with lock */
4179 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
4182 GstRTSPStreamPrivate *priv = stream->priv;
4183 const GstRTSPTransport *tr;
4187 tr = gst_rtsp_stream_transport_get_transport (trans);
4188 dest = tr->destination;
4190 switch (tr->lower_transport) {
4191 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
4197 GST_INFO ("adding %s:%d-%d", dest, min, max);
4198 if (!check_mcast_client_addr (stream, tr))
4200 add_client (priv->mcast_udpsink[0], priv->mcast_udpsink[1], dest, min,
4204 GST_INFO ("setting ttl-mc %d", tr->ttl);
4205 if (priv->mcast_udpsink[0])
4206 g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "ttl-mc", tr->ttl,
4208 if (priv->mcast_udpsink[1])
4209 g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "ttl-mc", tr->ttl,
4212 priv->transports = g_list_prepend (priv->transports, trans);
4214 GST_INFO ("removing %s:%d-%d", dest, min, max);
4215 if (!remove_mcast_client_addr (stream, dest, min, max))
4216 GST_WARNING_OBJECT (stream,
4217 "Failed to remove multicast address: %s:%d-%d", dest, min, max);
4218 remove_client (priv->mcast_udpsink[0], priv->mcast_udpsink[1], dest,
4220 priv->transports = g_list_remove (priv->transports, trans);
4224 case GST_RTSP_LOWER_TRANS_UDP:
4226 if (priv->client_side) {
4227 /* In client side mode the 'destination' is the RTSP server, so send
4229 min = tr->server_port.min;
4230 max = tr->server_port.max;
4232 min = tr->client_port.min;
4233 max = tr->client_port.max;
4237 GST_INFO ("adding %s:%d-%d", dest, min, max);
4238 add_client (priv->udpsink[0], priv->udpsink[1], dest, min, max);
4239 priv->transports = g_list_prepend (priv->transports, trans);
4241 GST_INFO ("removing %s:%d-%d", dest, min, max);
4242 remove_client (priv->udpsink[0], priv->udpsink[1], dest, min, max);
4243 priv->transports = g_list_remove (priv->transports, trans);
4245 priv->transports_cookie++;
4248 case GST_RTSP_LOWER_TRANS_TCP:
4250 GST_INFO ("adding TCP %s", tr->destination);
4251 priv->transports = g_list_prepend (priv->transports, trans);
4252 priv->n_tcp_transports++;
4254 GST_INFO ("removing TCP %s", tr->destination);
4255 priv->transports = g_list_remove (priv->transports, trans);
4256 priv->n_tcp_transports--;
4258 priv->transports_cookie++;
4261 goto unknown_transport;
4268 GST_INFO ("Unknown transport %d", tr->lower_transport);
4278 on_message_sent (gpointer user_data)
4280 GstRTSPStream *stream = user_data;
4281 GstRTSPStreamPrivate *priv = stream->priv;
4284 GST_DEBUG_OBJECT (stream, "message send complete");
4286 g_mutex_lock (&priv->lock);
4288 g_assert (priv->n_outstanding >= 0);
4290 if (priv->n_outstanding == 0)
4291 goto no_outstanding;
4293 priv->n_outstanding--;
4294 if (priv->n_outstanding == 0) {
4297 /* iterate from 1 and down, so we prioritize RTCP over RTP */
4298 for (i = 1; i >= 0; i--) {
4299 if (priv->have_buffer[i]) {
4308 send_tcp_message (stream, idx);
4310 g_mutex_unlock (&priv->lock);
4317 GST_INFO ("no outstanding messages");
4318 g_mutex_unlock (&priv->lock);
4324 * gst_rtsp_stream_add_transport:
4325 * @stream: a #GstRTSPStream
4326 * @trans: (transfer none): a #GstRTSPStreamTransport
4328 * Add the transport in @trans to @stream. The media of @stream will
4329 * then also be send to the values configured in @trans.
4331 * @stream must be joined to a bin.
4333 * @trans must contain a valid #GstRTSPTransport.
4335 * Returns: %TRUE if @trans was added
4338 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
4339 GstRTSPStreamTransport * trans)
4341 GstRTSPStreamPrivate *priv;
4344 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4345 priv = stream->priv;
4346 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
4347 g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4349 g_mutex_lock (&priv->lock);
4350 res = update_transport (stream, trans, TRUE);
4352 gst_rtsp_stream_transport_set_message_sent (trans, on_message_sent, stream,
4354 g_mutex_unlock (&priv->lock);
4360 * gst_rtsp_stream_remove_transport:
4361 * @stream: a #GstRTSPStream
4362 * @trans: (transfer none): a #GstRTSPStreamTransport
4364 * Remove the transport in @trans from @stream. The media of @stream will
4365 * not be sent to the values configured in @trans.
4367 * @stream must be joined to a bin.
4369 * @trans must contain a valid #GstRTSPTransport.
4371 * Returns: %TRUE if @trans was removed
4374 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
4375 GstRTSPStreamTransport * trans)
4377 GstRTSPStreamPrivate *priv;
4380 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4381 priv = stream->priv;
4382 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
4383 g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4385 g_mutex_lock (&priv->lock);
4386 res = update_transport (stream, trans, FALSE);
4387 g_mutex_unlock (&priv->lock);
4393 * gst_rtsp_stream_update_crypto:
4394 * @stream: a #GstRTSPStream
4396 * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
4398 * Update the new crypto information for @ssrc in @stream. If information
4399 * for @ssrc did not exist, it will be added. If information
4400 * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
4401 * be removed from @stream.
4403 * Returns: %TRUE if @crypto could be updated
4406 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
4407 guint ssrc, GstCaps * crypto)
4409 GstRTSPStreamPrivate *priv;
4411 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4412 g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
4414 priv = stream->priv;
4416 GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
4418 g_mutex_lock (&priv->lock);
4420 g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
4421 gst_caps_ref (crypto));
4423 g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
4424 g_mutex_unlock (&priv->lock);
4430 * gst_rtsp_stream_get_rtp_socket:
4431 * @stream: a #GstRTSPStream
4432 * @family: the socket family
4434 * Get the RTP socket from @stream for a @family.
4436 * @stream must be joined to a bin.
4438 * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
4439 * socket could be allocated for @family. Unref after usage
4442 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
4444 GstRTSPStreamPrivate *priv = stream->priv;
4447 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4448 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4449 family == G_SOCKET_FAMILY_IPV6, NULL);
4451 g_mutex_lock (&priv->lock);
4452 if (family == G_SOCKET_FAMILY_IPV6)
4453 socket = priv->socket_v6[0];
4455 socket = priv->socket_v4[0];
4458 socket = g_object_ref (socket);
4459 g_mutex_unlock (&priv->lock);
4465 * gst_rtsp_stream_get_rtcp_socket:
4466 * @stream: a #GstRTSPStream
4467 * @family: the socket family
4469 * Get the RTCP socket from @stream for a @family.
4471 * @stream must be joined to a bin.
4473 * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
4474 * socket could be allocated for @family. Unref after usage
4477 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
4479 GstRTSPStreamPrivate *priv = stream->priv;
4482 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4483 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4484 family == G_SOCKET_FAMILY_IPV6, NULL);
4486 g_mutex_lock (&priv->lock);
4487 if (family == G_SOCKET_FAMILY_IPV6)
4488 socket = priv->socket_v6[1];
4490 socket = priv->socket_v4[1];
4493 socket = g_object_ref (socket);
4494 g_mutex_unlock (&priv->lock);
4500 * gst_rtsp_stream_get_rtp_multicast_socket:
4501 * @stream: a #GstRTSPStream
4502 * @family: the socket family
4504 * Get the multicast RTP socket from @stream for a @family.
4506 * Returns: (transfer full) (nullable): the multicast RTP socket or %NULL if no
4507 * socket could be allocated for @family. Unref after usage
4510 gst_rtsp_stream_get_rtp_multicast_socket (GstRTSPStream * stream,
4511 GSocketFamily family)
4513 GstRTSPStreamPrivate *priv = stream->priv;
4516 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4517 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4518 family == G_SOCKET_FAMILY_IPV6, NULL);
4520 g_mutex_lock (&priv->lock);
4521 if (family == G_SOCKET_FAMILY_IPV6)
4522 socket = priv->mcast_socket_v6[0];
4524 socket = priv->mcast_socket_v4[0];
4527 socket = g_object_ref (socket);
4528 g_mutex_unlock (&priv->lock);
4534 * gst_rtsp_stream_get_rtcp_multicast_socket:
4535 * @stream: a #GstRTSPStream
4536 * @family: the socket family
4538 * Get the multicast RTCP socket from @stream for a @family.
4540 * Returns: (transfer full) (nullable): the multicast RTCP socket or %NULL if no
4541 * socket could be allocated for @family. Unref after usage
4544 gst_rtsp_stream_get_rtcp_multicast_socket (GstRTSPStream * stream,
4545 GSocketFamily family)
4547 GstRTSPStreamPrivate *priv = stream->priv;
4550 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4551 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4552 family == G_SOCKET_FAMILY_IPV6, NULL);
4554 g_mutex_lock (&priv->lock);
4555 if (family == G_SOCKET_FAMILY_IPV6)
4556 socket = priv->mcast_socket_v6[1];
4558 socket = priv->mcast_socket_v4[1];
4561 socket = g_object_ref (socket);
4562 g_mutex_unlock (&priv->lock);
4568 * gst_rtsp_stream_add_multicast_client_address:
4569 * @stream: a #GstRTSPStream
4570 * @destination: (transfer none): a multicast address to add
4571 * @rtp_port: RTP port
4572 * @rtcp_port: RTCP port
4573 * @family: socket family
4575 * Add multicast client address to stream. At this point, the sockets that
4576 * will stream RTP and RTCP data to @destination are supposed to be
4579 * Returns: %TRUE if @destination can be addedd and handled by @stream.
4582 gst_rtsp_stream_add_multicast_client_address (GstRTSPStream * stream,
4583 const gchar * destination, guint rtp_port, guint rtcp_port,
4584 GSocketFamily family)
4586 GstRTSPStreamPrivate *priv;
4588 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4589 g_return_val_if_fail (destination != NULL, FALSE);
4591 priv = stream->priv;
4592 g_mutex_lock (&priv->lock);
4593 if ((family == G_SOCKET_FAMILY_IPV4) && (priv->mcast_socket_v4[0] == NULL))
4595 else if ((family == G_SOCKET_FAMILY_IPV6) &&
4596 (priv->mcast_socket_v6[0] == NULL))
4599 if (!add_mcast_client_addr (stream, destination, rtp_port, rtcp_port))
4600 goto add_addr_error;
4601 g_mutex_unlock (&priv->lock);
4607 GST_WARNING_OBJECT (stream,
4608 "Failed to add multicast address: no udp socket");
4609 g_mutex_unlock (&priv->lock);
4614 GST_WARNING_OBJECT (stream,
4615 "Failed to add multicast address: invalid address");
4616 g_mutex_unlock (&priv->lock);
4622 * gst_rtsp_stream_get_multicast_client_addresses
4623 * @stream: a #GstRTSPStream
4625 * Get all multicast client addresses that RTP data will be sent to
4627 * Returns: A comma separated list of host:port pairs with destinations
4630 gst_rtsp_stream_get_multicast_client_addresses (GstRTSPStream * stream)
4632 GstRTSPStreamPrivate *priv;
4636 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4638 priv = stream->priv;
4639 str = g_string_new ("");
4641 g_mutex_lock (&priv->lock);
4642 clients = priv->mcast_clients;
4643 while (clients != NULL) {
4644 UdpClientAddrInfo *client;
4646 client = (UdpClientAddrInfo *) clients->data;
4647 clients = g_list_next (clients);
4648 g_string_append_printf (str, "%s:%d%s", client->address, client->rtp_port,
4649 (clients != NULL ? "," : ""));
4651 g_mutex_unlock (&priv->lock);
4653 return g_string_free (str, FALSE);
4657 * gst_rtsp_stream_set_seqnum:
4658 * @stream: a #GstRTSPStream
4659 * @seqnum: a new sequence number
4661 * Configure the sequence number in the payloader of @stream to @seqnum.
4664 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
4666 GstRTSPStreamPrivate *priv;
4668 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
4670 priv = stream->priv;
4672 g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
4676 * gst_rtsp_stream_get_seqnum:
4677 * @stream: a #GstRTSPStream
4679 * Get the configured sequence number in the payloader of @stream.
4681 * Returns: the sequence number of the payloader.
4684 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
4686 GstRTSPStreamPrivate *priv;
4689 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
4691 priv = stream->priv;
4693 g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
4699 * gst_rtsp_stream_transport_filter:
4700 * @stream: a #GstRTSPStream
4701 * @func: (scope call) (allow-none): a callback
4702 * @user_data: (closure): user data passed to @func
4704 * Call @func for each transport managed by @stream. The result value of @func
4705 * determines what happens to the transport. @func will be called with @stream
4706 * locked so no further actions on @stream can be performed from @func.
4708 * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
4711 * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
4713 * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
4714 * will also be added with an additional ref to the result #GList of this
4717 * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
4719 * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
4720 * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
4721 * element in the #GList should be unreffed before the list is freed.
4724 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
4725 GstRTSPStreamTransportFilterFunc func, gpointer user_data)
4727 GstRTSPStreamPrivate *priv;
4728 GList *result, *walk, *next;
4729 GHashTable *visited = NULL;
4732 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4734 priv = stream->priv;
4738 visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
4740 g_mutex_lock (&priv->lock);
4742 cookie = priv->transports_cookie;
4743 for (walk = priv->transports; walk; walk = next) {
4744 GstRTSPStreamTransport *trans = walk->data;
4745 GstRTSPFilterResult res;
4748 next = g_list_next (walk);
4751 /* only visit each transport once */
4752 if (g_hash_table_contains (visited, trans))
4755 g_hash_table_add (visited, g_object_ref (trans));
4756 g_mutex_unlock (&priv->lock);
4758 res = func (stream, trans, user_data);
4760 g_mutex_lock (&priv->lock);
4762 res = GST_RTSP_FILTER_REF;
4764 changed = (cookie != priv->transports_cookie);
4767 case GST_RTSP_FILTER_REMOVE:
4768 update_transport (stream, trans, FALSE);
4770 case GST_RTSP_FILTER_REF:
4771 result = g_list_prepend (result, g_object_ref (trans));
4773 case GST_RTSP_FILTER_KEEP:
4780 g_mutex_unlock (&priv->lock);
4783 g_hash_table_unref (visited);
4788 static GstPadProbeReturn
4789 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4791 GstRTSPStreamPrivate *priv;
4792 GstRTSPStream *stream;
4793 GstBuffer *buffer = NULL;
4796 priv = stream->priv;
4798 GST_DEBUG_OBJECT (pad, "now blocking");
4800 g_mutex_lock (&priv->lock);
4801 priv->blocking = TRUE;
4803 if ((info->type & GST_PAD_PROBE_TYPE_BUFFER)) {
4804 buffer = gst_pad_probe_info_get_buffer (info);
4805 } else if ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)) {
4806 GstBufferList *list = gst_pad_probe_info_get_buffer_list (info);
4807 buffer = gst_buffer_list_get (list, 0);
4809 g_assert_not_reached ();
4813 priv->position = GST_BUFFER_TIMESTAMP (buffer);
4814 GST_DEBUG_OBJECT (stream, "buffer position: %" GST_TIME_FORMAT,
4815 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
4816 g_mutex_unlock (&priv->lock);
4818 gst_element_post_message (priv->payloader,
4819 gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
4820 gst_structure_new_empty ("GstRTSPStreamBlocking")));
4822 return GST_PAD_PROBE_OK;
4826 set_blocked (GstRTSPStream * stream, gboolean blocked)
4828 GstRTSPStreamPrivate *priv;
4831 GST_DEBUG_OBJECT (stream, "blocked: %d", blocked);
4833 priv = stream->priv;
4836 for (i = 0; i < 2; i++) {
4837 if (priv->blocked_id[i] != 0)
4839 if (priv->send_src[i]) {
4840 priv->blocking = FALSE;
4841 priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
4842 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
4843 GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
4844 g_object_ref (stream), g_object_unref);
4848 for (i = 0; i < 2; i++) {
4849 if (priv->blocked_id[i] != 0) {
4850 gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]);
4851 priv->blocked_id[i] = 0;
4854 priv->blocking = FALSE;
4859 * gst_rtsp_stream_set_blocked:
4860 * @stream: a #GstRTSPStream
4861 * @blocked: boolean indicating we should block or unblock
4863 * Blocks or unblocks the dataflow on @stream.
4865 * Returns: %TRUE on success
4868 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
4870 GstRTSPStreamPrivate *priv;
4872 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4874 priv = stream->priv;
4875 g_mutex_lock (&priv->lock);
4876 set_blocked (stream, blocked);
4877 g_mutex_unlock (&priv->lock);
4883 * gst_rtsp_stream_ublock_linked:
4884 * @stream: a #GstRTSPStream
4886 * Unblocks the dataflow on @stream if it is linked.
4888 * Returns: %TRUE on success
4891 gst_rtsp_stream_unblock_linked (GstRTSPStream * stream)
4893 GstRTSPStreamPrivate *priv;
4895 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4897 priv = stream->priv;
4898 g_mutex_lock (&priv->lock);
4899 if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0]))
4900 set_blocked (stream, FALSE);
4901 g_mutex_unlock (&priv->lock);
4907 * gst_rtsp_stream_is_blocking:
4908 * @stream: a #GstRTSPStream
4910 * Check if @stream is blocking on a #GstBuffer.
4912 * Returns: %TRUE if @stream is blocking
4915 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
4917 GstRTSPStreamPrivate *priv;
4920 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4922 priv = stream->priv;
4924 g_mutex_lock (&priv->lock);
4925 result = priv->blocking;
4926 g_mutex_unlock (&priv->lock);
4932 * gst_rtsp_stream_query_position:
4933 * @stream: a #GstRTSPStream
4934 * @position: (out): current position of a #GstRTSPStream
4936 * Query the position of the stream in %GST_FORMAT_TIME. This only considers
4937 * the RTP parts of the pipeline and not the RTCP parts.
4939 * Returns: %TRUE if the position could be queried
4942 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
4944 GstRTSPStreamPrivate *priv;
4948 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4950 /* query position: if no sinks have been added yet,
4951 * we obtain the position from the pad otherwise we query the sinks */
4953 priv = stream->priv;
4955 g_mutex_lock (&priv->lock);
4956 /* depending on the transport type, it should query corresponding sink */
4957 if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP)
4958 sink = priv->udpsink[0];
4959 else if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4960 sink = priv->mcast_udpsink[0];
4962 sink = priv->appsink[0];
4965 gst_object_ref (sink);
4966 } else if (priv->send_src[0]) {
4967 pad = gst_object_ref (priv->send_src[0]);
4969 g_mutex_unlock (&priv->lock);
4970 GST_WARNING_OBJECT (stream, "Couldn't obtain postion: erroneous pipeline");
4973 g_mutex_unlock (&priv->lock);
4976 if (!gst_element_query_position (sink, GST_FORMAT_TIME, position)) {
4977 GST_WARNING_OBJECT (stream,
4978 "Couldn't obtain postion: position query failed");
4979 gst_object_unref (sink);
4982 gst_object_unref (sink);
4985 const GstSegment *segment;
4987 event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4989 GST_WARNING_OBJECT (stream, "Couldn't obtain postion: no segment event");
4990 gst_object_unref (pad);
4994 gst_event_parse_segment (event, &segment);
4995 if (segment->format != GST_FORMAT_TIME) {
4998 g_mutex_lock (&priv->lock);
4999 *position = priv->position;
5000 g_mutex_unlock (&priv->lock);
5002 gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *position);
5004 gst_event_unref (event);
5005 gst_object_unref (pad);
5012 * gst_rtsp_stream_query_stop:
5013 * @stream: a #GstRTSPStream
5014 * @stop: (out): current stop of a #GstRTSPStream
5016 * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
5017 * the RTP parts of the pipeline and not the RTCP parts.
5019 * Returns: %TRUE if the stop could be queried
5022 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
5024 GstRTSPStreamPrivate *priv;
5028 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5030 /* query stop position: if no sinks have been added yet,
5031 * we obtain the stop position from the pad otherwise we query the sinks */
5033 priv = stream->priv;
5035 g_mutex_lock (&priv->lock);
5036 /* depending on the transport type, it should query corresponding sink */
5037 if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP)
5038 sink = priv->udpsink[0];
5039 else if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
5040 sink = priv->mcast_udpsink[0];
5042 sink = priv->appsink[0];
5045 gst_object_ref (sink);
5046 } else if (priv->send_src[0]) {
5047 pad = gst_object_ref (priv->send_src[0]);
5049 g_mutex_unlock (&priv->lock);
5050 GST_WARNING_OBJECT (stream, "Couldn't obtain stop: erroneous pipeline");
5053 g_mutex_unlock (&priv->lock);
5059 query = gst_query_new_segment (GST_FORMAT_TIME);
5060 if (!gst_element_query (sink, query)) {
5061 GST_WARNING_OBJECT (stream, "Couldn't obtain stop: element query failed");
5062 gst_query_unref (query);
5063 gst_object_unref (sink);
5066 gst_query_parse_segment (query, NULL, &format, NULL, stop);
5067 if (format != GST_FORMAT_TIME)
5069 gst_query_unref (query);
5070 gst_object_unref (sink);
5073 const GstSegment *segment;
5075 event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
5077 GST_WARNING_OBJECT (stream, "Couldn't obtain stop: no segment event");
5078 gst_object_unref (pad);
5081 gst_event_parse_segment (event, &segment);
5082 if (segment->format != GST_FORMAT_TIME) {
5085 *stop = segment->stop;
5087 *stop = segment->duration;
5089 *stop = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *stop);
5091 gst_event_unref (event);
5092 gst_object_unref (pad);
5099 * gst_rtsp_stream_seekable:
5100 * @stream: a #GstRTSPStream
5102 * Checks whether the individual @stream is seekable.
5104 * Returns: %TRUE if @stream is seekable, else %FALSE.
5107 gst_rtsp_stream_seekable (GstRTSPStream * stream)
5109 GstRTSPStreamPrivate *priv;
5111 GstQuery *query = NULL;
5112 gboolean seekable = FALSE;
5114 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5116 /* query stop position: if no sinks have been added yet,
5117 * we obtain the stop position from the pad otherwise we query the sinks */
5119 priv = stream->priv;
5121 g_mutex_lock (&priv->lock);
5122 /* depending on the transport type, it should query corresponding sink */
5124 pad = gst_object_ref (priv->srcpad);
5126 g_mutex_unlock (&priv->lock);
5127 GST_WARNING_OBJECT (stream, "Pad not available, can't query seekability");
5130 g_mutex_unlock (&priv->lock);
5132 query = gst_query_new_seeking (GST_FORMAT_TIME);
5133 if (!gst_pad_query (pad, query)) {
5134 GST_WARNING_OBJECT (stream, "seeking query failed");
5137 gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
5141 gst_object_unref (pad);
5143 gst_query_unref (query);
5145 GST_DEBUG_OBJECT (stream, "Returning %d", seekable);
5151 * gst_rtsp_stream_complete_stream:
5152 * @stream: a #GstRTSPStream
5153 * @transport: a #GstRTSPTransport
5155 * Add a receiver and sender part to the pipeline based on the transport from
5158 * Returns: %TRUE if the stream has been sucessfully updated.
5161 gst_rtsp_stream_complete_stream (GstRTSPStream * stream,
5162 const GstRTSPTransport * transport)
5164 GstRTSPStreamPrivate *priv;
5166 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5168 priv = stream->priv;
5169 GST_DEBUG_OBJECT (stream, "complete stream");
5171 g_mutex_lock (&priv->lock);
5173 if (!(priv->allowed_protocols & transport->lower_transport))
5174 goto unallowed_transport;
5176 if (!create_receiver_part (stream, transport))
5177 goto create_receiver_error;
5179 /* in the RECORD case, we only add RTCP sender part */
5180 if (!create_sender_part (stream, transport))
5181 goto create_sender_error;
5183 priv->configured_protocols |= transport->lower_transport;
5185 priv->is_complete = TRUE;
5186 g_mutex_unlock (&priv->lock);
5188 GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated");
5191 create_receiver_error:
5192 create_sender_error:
5193 unallowed_transport:
5195 g_mutex_unlock (&priv->lock);
5201 * gst_rtsp_stream_is_complete:
5202 * @stream: a #GstRTSPStream
5204 * Checks whether the stream is complete, contains the receiver and the sender
5205 * parts. As the stream contains sink(s) element(s), it's possible to perform
5206 * seek operations on it.
5208 * Returns: %TRUE if the stream contains at least one sink element.
5211 gst_rtsp_stream_is_complete (GstRTSPStream * stream)
5213 GstRTSPStreamPrivate *priv;
5214 gboolean ret = FALSE;
5216 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5218 priv = stream->priv;
5219 g_mutex_lock (&priv->lock);
5220 ret = priv->is_complete;
5221 g_mutex_unlock (&priv->lock);
5227 * gst_rtsp_stream_is_sender:
5228 * @stream: a #GstRTSPStream
5230 * Checks whether the stream is a sender.
5232 * Returns: %TRUE if the stream is a sender and %FALSE otherwise.
5235 gst_rtsp_stream_is_sender (GstRTSPStream * stream)
5237 GstRTSPStreamPrivate *priv;
5238 gboolean ret = FALSE;
5240 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5242 priv = stream->priv;
5243 g_mutex_lock (&priv->lock);
5244 ret = (priv->srcpad != NULL);
5245 g_mutex_unlock (&priv->lock);
5251 * gst_rtsp_stream_is_receiver:
5252 * @stream: a #GstRTSPStream
5254 * Checks whether the stream is a receiver.
5256 * Returns: %TRUE if the stream is a receiver and %FALSE otherwise.
5259 gst_rtsp_stream_is_receiver (GstRTSPStream * stream)
5261 GstRTSPStreamPrivate *priv;
5262 gboolean ret = FALSE;
5264 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5266 priv = stream->priv;
5267 g_mutex_lock (&priv->lock);
5268 ret = (priv->sinkpad != NULL);
5269 g_mutex_unlock (&priv->lock);
5274 #define AES_128_KEY_LEN 16
5275 #define AES_256_KEY_LEN 32
5277 #define HMAC_32_KEY_LEN 4
5278 #define HMAC_80_KEY_LEN 10
5281 mikey_apply_policy (GstCaps * caps, GstMIKEYMessage * msg, guint8 policy)
5283 const gchar *srtp_cipher;
5284 const gchar *srtp_auth;
5285 const GstMIKEYPayload *sp;
5288 /* loop over Security policy until we find one containing policy */
5290 if ((sp = gst_mikey_message_find_payload (msg, GST_MIKEY_PT_SP, i)) == NULL)
5293 if (((GstMIKEYPayloadSP *) sp)->policy == policy)
5297 /* the default ciphers */
5298 srtp_cipher = "aes-128-icm";
5299 srtp_auth = "hmac-sha1-80";
5301 /* now override the defaults with what is in the Security Policy */
5305 /* collect all the params and go over them */
5306 len = gst_mikey_payload_sp_get_n_params (sp);
5307 for (i = 0; i < len; i++) {
5308 const GstMIKEYPayloadSPParam *param =
5309 gst_mikey_payload_sp_get_param (sp, i);
5311 switch (param->type) {
5312 case GST_MIKEY_SP_SRTP_ENC_ALG:
5313 switch (param->val[0]) {
5315 srtp_cipher = "null";
5319 srtp_cipher = "aes-128-icm";
5325 case GST_MIKEY_SP_SRTP_ENC_KEY_LEN:
5326 switch (param->val[0]) {
5327 case AES_128_KEY_LEN:
5328 srtp_cipher = "aes-128-icm";
5330 case AES_256_KEY_LEN:
5331 srtp_cipher = "aes-256-icm";
5337 case GST_MIKEY_SP_SRTP_AUTH_ALG:
5338 switch (param->val[0]) {
5344 srtp_auth = "hmac-sha1-80";
5350 case GST_MIKEY_SP_SRTP_AUTH_KEY_LEN:
5351 switch (param->val[0]) {
5352 case HMAC_32_KEY_LEN:
5353 srtp_auth = "hmac-sha1-32";
5355 case HMAC_80_KEY_LEN:
5356 srtp_auth = "hmac-sha1-80";
5362 case GST_MIKEY_SP_SRTP_SRTP_ENC:
5364 case GST_MIKEY_SP_SRTP_SRTCP_ENC:
5371 /* now configure the SRTP parameters */
5372 gst_caps_set_simple (caps,
5373 "srtp-cipher", G_TYPE_STRING, srtp_cipher,
5374 "srtp-auth", G_TYPE_STRING, srtp_auth,
5375 "srtcp-cipher", G_TYPE_STRING, srtp_cipher,
5376 "srtcp-auth", G_TYPE_STRING, srtp_auth, NULL);
5382 handle_mikey_data (GstRTSPStream * stream, guint8 * data, gsize size)
5384 GstMIKEYMessage *msg;
5386 GstCaps *caps = NULL;
5387 GstMIKEYPayloadKEMAC *kemac;
5388 const GstMIKEYPayloadKeyData *pkd;
5391 /* the MIKEY message contains a CSB or crypto session bundle. It is a
5392 * set of Crypto Sessions protected with the same master key.
5393 * In the context of SRTP, an RTP and its RTCP stream is part of a
5395 if ((msg = gst_mikey_message_new_from_data (data, size, NULL, NULL)) == NULL)
5398 /* we can only handle SRTP crypto sessions for now */
5399 if (msg->map_type != GST_MIKEY_MAP_TYPE_SRTP)
5400 goto invalid_map_type;
5402 /* get the number of crypto sessions. This maps SSRC to its
5403 * security parameters */
5404 n_cs = gst_mikey_message_get_n_cs (msg);
5406 goto no_crypto_sessions;
5408 /* we also need keys */
5409 if (!(kemac = (GstMIKEYPayloadKEMAC *) gst_mikey_message_find_payload
5410 (msg, GST_MIKEY_PT_KEMAC, 0)))
5413 /* we don't support encrypted keys */
5414 if (kemac->enc_alg != GST_MIKEY_ENC_NULL
5415 || kemac->mac_alg != GST_MIKEY_MAC_NULL)
5416 goto unsupported_encryption;
5418 /* get Key data sub-payload */
5419 pkd = (const GstMIKEYPayloadKeyData *)
5420 gst_mikey_payload_kemac_get_sub (&kemac->pt, 0);
5423 gst_buffer_new_wrapped (g_memdup (pkd->key_data, pkd->key_len),
5426 /* go over all crypto sessions and create the security policy for each
5428 for (i = 0; i < n_cs; i++) {
5429 const GstMIKEYMapSRTP *map = gst_mikey_message_get_cs_srtp (msg, i);
5431 caps = gst_caps_new_simple ("application/x-srtp",
5432 "ssrc", G_TYPE_UINT, map->ssrc,
5433 "roc", G_TYPE_UINT, map->roc, "srtp-key", GST_TYPE_BUFFER, key, NULL);
5434 mikey_apply_policy (caps, msg, map->policy);
5436 gst_rtsp_stream_update_crypto (stream, map->ssrc, caps);
5437 gst_caps_unref (caps);
5439 gst_mikey_message_unref (msg);
5440 gst_buffer_unref (key);
5447 GST_DEBUG_OBJECT (stream, "failed to parse MIKEY message");
5452 GST_DEBUG_OBJECT (stream, "invalid map type %d", msg->map_type);
5453 goto cleanup_message;
5457 GST_DEBUG_OBJECT (stream, "no crypto sessions");
5458 goto cleanup_message;
5462 GST_DEBUG_OBJECT (stream, "no keys found");
5463 goto cleanup_message;
5465 unsupported_encryption:
5467 GST_DEBUG_OBJECT (stream, "unsupported key encryption");
5468 goto cleanup_message;
5472 gst_mikey_message_unref (msg);
5477 #define IS_STRIP_CHAR(c) (g_ascii_isspace ((guchar)(c)) || ((c) == '\"'))
5480 strip_chars (gchar * str)
5487 if (!IS_STRIP_CHAR (str[len]))
5491 for (s = str; *s && IS_STRIP_CHAR (*s); s++);
5492 memmove (str, s, len + 1);
5496 * gst_rtsp_stream_handle_keymgmt:
5497 * @stream: a #GstRTSPStream
5498 * @keymgmt: a keymgmt header
5500 * Parse and handle a KeyMgmt header.
5504 /* KeyMgmt = "KeyMgmt" ":" key-mgmt-spec 0*("," key-mgmt-spec)
5505 * key-mgmt-spec = "prot" "=" KMPID ";" ["uri" "=" %x22 URI %x22 ";"]
5508 gst_rtsp_stream_handle_keymgmt (GstRTSPStream * stream, const gchar * keymgmt)
5513 specs = g_strsplit (keymgmt, ",", 0);
5514 for (i = 0; specs[i]; i++) {
5517 split = g_strsplit (specs[i], ";", 0);
5518 for (j = 0; split[j]; j++) {
5519 g_strstrip (split[j]);
5520 if (g_str_has_prefix (split[j], "prot=")) {
5521 g_strstrip (split[j] + 5);
5522 if (!g_str_equal (split[j] + 5, "mikey"))
5524 GST_DEBUG ("found mikey");
5525 } else if (g_str_has_prefix (split[j], "uri=")) {
5526 strip_chars (split[j] + 4);
5527 GST_DEBUG ("found uri '%s'", split[j] + 4);
5528 } else if (g_str_has_prefix (split[j], "data=")) {
5531 strip_chars (split[j] + 5);
5532 GST_DEBUG ("found data '%s'", split[j] + 5);
5533 data = g_base64_decode_inplace (split[j] + 5, &size);
5534 handle_mikey_data (stream, data, size);
5545 * gst_rtsp_stream_get_ulpfec_pt:
5547 * Returns: the payload type used for ULPFEC protection packets
5552 gst_rtsp_stream_get_ulpfec_pt (GstRTSPStream * stream)
5556 g_mutex_lock (&stream->priv->lock);
5557 res = stream->priv->ulpfec_pt;
5558 g_mutex_unlock (&stream->priv->lock);
5564 * gst_rtsp_stream_set_ulpfec_pt:
5566 * Set the payload type to be used for ULPFEC protection packets
5571 gst_rtsp_stream_set_ulpfec_pt (GstRTSPStream * stream, guint pt)
5573 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5575 g_mutex_lock (&stream->priv->lock);
5576 stream->priv->ulpfec_pt = pt;
5577 if (stream->priv->ulpfec_encoder) {
5578 g_object_set (stream->priv->ulpfec_encoder, "pt", pt, NULL);
5580 g_mutex_unlock (&stream->priv->lock);
5584 * gst_rtsp_stream_request_ulpfec_decoder:
5586 * Creating a rtpulpfecdec element
5588 * Returns: (transfer full) (nullable): a #GstElement.
5593 gst_rtsp_stream_request_ulpfec_decoder (GstRTSPStream * stream,
5594 GstElement * rtpbin, guint sessid)
5596 GObject *internal_storage = NULL;
5598 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5599 stream->priv->ulpfec_decoder =
5600 gst_object_ref (gst_element_factory_make ("rtpulpfecdec", NULL));
5602 g_signal_emit_by_name (G_OBJECT (rtpbin), "get-internal-storage", sessid,
5604 g_object_set (stream->priv->ulpfec_decoder, "storage", internal_storage,
5606 g_object_unref (internal_storage);
5607 update_ulpfec_decoder_pt (stream);
5609 return stream->priv->ulpfec_decoder;
5613 * gst_rtsp_stream_request_ulpfec_encoder:
5615 * Creating a rtpulpfecenc element
5617 * Returns: (transfer full) (nullable): a #GstElement.
5622 gst_rtsp_stream_request_ulpfec_encoder (GstRTSPStream * stream, guint sessid)
5624 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5626 if (!stream->priv->ulpfec_percentage)
5629 stream->priv->ulpfec_encoder =
5630 gst_object_ref (gst_element_factory_make ("rtpulpfecenc", NULL));
5632 g_object_set (stream->priv->ulpfec_encoder, "pt", stream->priv->ulpfec_pt,
5633 "percentage", stream->priv->ulpfec_percentage, NULL);
5635 return stream->priv->ulpfec_encoder;
5639 * gst_rtsp_stream_set_ulpfec_percentage:
5641 * Sets the amount of redundancy to apply when creating ULPFEC
5642 * protection packets.
5647 gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream * stream, guint percentage)
5649 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5651 g_mutex_lock (&stream->priv->lock);
5652 stream->priv->ulpfec_percentage = percentage;
5653 if (stream->priv->ulpfec_encoder) {
5654 g_object_set (stream->priv->ulpfec_encoder, "percentage", percentage, NULL);
5656 g_mutex_unlock (&stream->priv->lock);
5660 * gst_rtsp_stream_get_ulpfec_percentage:
5662 * Returns: the amount of redundancy applied when creating ULPFEC
5663 * protection packets.
5668 gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
5672 g_mutex_lock (&stream->priv->lock);
5673 res = stream->priv->ulpfec_percentage;
5674 g_mutex_unlock (&stream->priv->lock);