2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3 * Copyright (C) 2015 Centricular Ltd
4 * Author: Sebastian Dröge <sebastian@centricular.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
23 * @short_description: A media stream
24 * @see_also: #GstRTSPMedia
26 * The #GstRTSPStream object manages the data transport for one stream. It
27 * is created from a payloader element and a source pad that produce the RTP
28 * packets for the stream.
30 * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31 * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
33 * The #GstRTSPStream will use the configured addresspool, as set with
34 * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35 * stream. With gst_rtsp_stream_get_multicast_address() you can get the
38 * With gst_rtsp_stream_get_server_port () you can get the port that the server
39 * will use to receive RTCP. This is the part that the clients will use to send
42 * With gst_rtsp_stream_add_transport() destinations can be added where the
43 * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44 * the destination again.
46 * Each #GstRTSPStreamTransport spawns one queue that will serve as a backlog of a
47 * controllable maximum size when the reflux from the TCP connection's backpressure
48 * starts spilling all over.
50 * Unlike the backlog in rtspconnection, which we have decided should only contain
51 * at most one RTP and one RTCP data message in order to allow control messages to
52 * go through unobstructed, this backlog only consists of data messages, allowing
53 * us to fill it up without concern.
55 * When multiple TCP transports exist, for example in the context of a shared media,
56 * we only pop samples from our appsinks when at least one of the transports doesn't
57 * experience back pressure: this allows us to pace our sample popping to the speed
58 * of the fastest client.
60 * When a sample is popped, it is either sent directly on transports that don't
61 * experience backpressure, or queued on the transport's backlog otherwise. Samples
62 * are then popped from that backlog when the transport reports it has sent the message.
64 * Once the backlog reaches an overly large duration, the transport is dropped as
65 * the client was deemed too slow.
77 #include <gst/app/gstappsrc.h>
78 #include <gst/app/gstappsink.h>
80 #include <gst/rtp/gstrtpbuffer.h>
82 #include "rtsp-stream.h"
83 #include "rtsp-server-internal.h"
85 struct _GstRTSPStreamPrivate
89 /* Only one pad is ever set */
90 GstPad *srcpad, *sinkpad;
91 GstElement *payloader;
95 /* TRUE if this stream is running on
96 * the client side of an RTSP link (for RECORD) */
100 /* TRUE if stream is complete. This means that the receiver and the sender
101 * parts are present in the stream. */
102 gboolean is_complete;
103 GstRTSPProfile profiles;
104 GstRTSPLowerTrans allowed_protocols;
105 GstRTSPLowerTrans configured_protocols;
107 /* pads on the rtpbin */
108 GstPad *send_rtp_sink;
109 GstPad *recv_rtp_src;
110 GstPad *recv_sink[2];
113 /* the RTPSession object */
116 /* SRTP encoder/decoder */
121 /* for UDP unicast */
122 GstElement *udpsrc_v4[2];
123 GstElement *udpsrc_v6[2];
124 GstElement *udpqueue[2];
125 GstElement *udpsink[2];
126 GSocket *socket_v4[2];
127 GSocket *socket_v6[2];
129 /* for UDP multicast */
130 GstElement *mcast_udpsrc_v4[2];
131 GstElement *mcast_udpsrc_v6[2];
132 GstElement *mcast_udpqueue[2];
133 GstElement *mcast_udpsink[2];
134 GSocket *mcast_socket_v4[2];
135 GSocket *mcast_socket_v6[2];
136 GList *mcast_clients;
138 /* for TCP transport */
139 GstElement *appsrc[2];
140 GstClockTime appsrc_base_time[2];
141 GstElement *appqueue[2];
142 GstElement *appsink[2];
145 GstElement *funnel[2];
149 GstElement *rtxreceive;
151 GstClockTime rtx_time;
154 gboolean do_rate_control;
156 /* Forward Error Correction with RFC 5109 */
157 GstElement *ulpfec_decoder;
158 GstElement *ulpfec_encoder;
160 gboolean ulpfec_enabled;
161 guint ulpfec_percentage;
163 /* pool used to manage unicast and multicast addresses */
164 GstRTSPAddressPool *pool;
166 /* unicast server addr/port */
167 GstRTSPAddress *server_addr_v4;
168 GstRTSPAddress *server_addr_v6;
170 /* multicast addresses */
171 GstRTSPAddress *mcast_addr_v4;
172 GstRTSPAddress *mcast_addr_v6;
174 gchar *multicast_iface;
176 gboolean bind_mcast_address;
178 /* the caps of the stream */
182 /* transports we stream to */
185 guint transports_cookie;
187 guint tr_cache_cookie;
188 guint n_tcp_transports;
189 gboolean have_buffer[2];
193 /* Sending logic for TCP */
194 GThread *send_thread;
197 /* @send_lock is released when pushing data out, we use
198 * a cookie to decide whether we should wait on @send_cond
199 * before checking the transports' backlogs again
202 /* Used to control shutdown of @send_thread */
203 gboolean continue_sending;
205 /* stream blocking */
206 gulong blocked_id[2];
209 /* current stream postion */
210 GstClockTime position;
212 /* pt->caps map for RECORD streams */
215 GstRTSPPublishClockMode publish_clock_mode;
216 GThreadPool *send_pool;
218 /* Used to provide accurate rtpinfo when the stream is blocking */
219 gboolean blocked_buffer;
220 guint32 blocked_seqnum;
221 guint32 blocked_rtptime;
222 GstClockTime blocked_running_time;
223 gint blocked_clock_rate;
225 /* Whether we should send and receive RTCP */
226 gboolean enable_rtcp;
228 /* blocking early rtcp packets */
229 GstPad *block_early_rtcp_pad;
230 gulong block_early_rtcp_probe;
231 GstPad *block_early_rtcp_pad_ipv6;
232 gulong block_early_rtcp_probe_ipv6;
235 #define DEFAULT_CONTROL NULL
236 #define DEFAULT_PROFILES GST_RTSP_PROFILE_AVP
237 #define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
238 GST_RTSP_LOWER_TRANS_TCP
239 #define DEFAULT_MAX_MCAST_TTL 255
240 #define DEFAULT_BIND_MCAST_ADDRESS FALSE
241 #define DEFAULT_DO_RATE_CONTROL TRUE
242 #define DEFAULT_ENABLE_RTCP TRUE
255 SIGNAL_NEW_RTP_ENCODER,
256 SIGNAL_NEW_RTCP_ENCODER,
257 SIGNAL_NEW_RTP_RTCP_DECODER,
262 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
263 #define GST_CAT_DEFAULT rtsp_stream_debug
265 static GQuark ssrc_stream_map_key;
267 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
268 GValue * value, GParamSpec * pspec);
269 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
270 const GValue * value, GParamSpec * pspec);
272 static void gst_rtsp_stream_finalize (GObject * obj);
275 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
278 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
280 G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
283 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
285 GObjectClass *gobject_class;
287 gobject_class = G_OBJECT_CLASS (klass);
289 gobject_class->get_property = gst_rtsp_stream_get_property;
290 gobject_class->set_property = gst_rtsp_stream_set_property;
291 gobject_class->finalize = gst_rtsp_stream_finalize;
293 g_object_class_install_property (gobject_class, PROP_CONTROL,
294 g_param_spec_string ("control", "Control",
295 "The control string for this stream", DEFAULT_CONTROL,
296 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298 g_object_class_install_property (gobject_class, PROP_PROFILES,
299 g_param_spec_flags ("profiles", "Profiles",
300 "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
301 DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303 g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
304 g_param_spec_flags ("protocols", "Protocols",
305 "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
306 DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308 gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
309 g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
310 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
312 gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
313 g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
314 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
316 gst_rtsp_stream_signals[SIGNAL_NEW_RTP_RTCP_DECODER] =
317 g_signal_new ("new-rtp-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
318 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
320 gst_rtsp_stream_signals[SIGNAL_RTCP_STATS] =
321 g_signal_new ("rtcp-statistics", G_TYPE_FROM_CLASS (klass),
322 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
323 G_TYPE_NONE, 1, GST_TYPE_STRUCTURE);
325 GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
327 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
331 gst_rtsp_stream_init (GstRTSPStream * stream)
333 GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
335 GST_DEBUG ("new stream %p", stream);
340 priv->control = g_strdup (DEFAULT_CONTROL);
341 priv->profiles = DEFAULT_PROFILES;
342 priv->allowed_protocols = DEFAULT_PROTOCOLS;
343 priv->configured_protocols = 0;
344 priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
345 priv->max_mcast_ttl = DEFAULT_MAX_MCAST_TTL;
346 priv->bind_mcast_address = DEFAULT_BIND_MCAST_ADDRESS;
347 priv->do_rate_control = DEFAULT_DO_RATE_CONTROL;
348 priv->enable_rtcp = DEFAULT_ENABLE_RTCP;
350 g_mutex_init (&priv->lock);
352 priv->continue_sending = TRUE;
353 priv->send_cookie = 0;
354 g_cond_init (&priv->send_cond);
355 g_mutex_init (&priv->send_lock);
357 priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
358 NULL, (GDestroyNotify) gst_caps_unref);
359 priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
360 (GDestroyNotify) gst_caps_unref);
361 priv->send_pool = NULL;
362 priv->block_early_rtcp_pad = NULL;
363 priv->block_early_rtcp_probe = 0;
364 priv->block_early_rtcp_pad_ipv6 = NULL;
365 priv->block_early_rtcp_probe_ipv6 = 0;
368 typedef struct _UdpClientAddrInfo UdpClientAddrInfo;
370 struct _UdpClientAddrInfo
374 guint add_count; /* how often this address has been added */
378 free_mcast_client (gpointer data)
380 UdpClientAddrInfo *client = data;
382 g_free (client->address);
387 gst_rtsp_stream_finalize (GObject * obj)
389 GstRTSPStream *stream;
390 GstRTSPStreamPrivate *priv;
393 stream = GST_RTSP_STREAM (obj);
396 GST_DEBUG ("finalize stream %p", stream);
398 /* we really need to be unjoined now */
399 g_return_if_fail (priv->joined_bin == NULL);
402 g_thread_pool_free (priv->send_pool, TRUE, TRUE);
403 if (priv->mcast_addr_v4)
404 gst_rtsp_address_free (priv->mcast_addr_v4);
405 if (priv->mcast_addr_v6)
406 gst_rtsp_address_free (priv->mcast_addr_v6);
407 if (priv->server_addr_v4)
408 gst_rtsp_address_free (priv->server_addr_v4);
409 if (priv->server_addr_v6)
410 gst_rtsp_address_free (priv->server_addr_v6);
412 g_object_unref (priv->pool);
414 g_object_unref (priv->rtxsend);
415 if (priv->rtxreceive)
416 g_object_unref (priv->rtxreceive);
417 if (priv->ulpfec_encoder)
418 gst_object_unref (priv->ulpfec_encoder);
419 if (priv->ulpfec_decoder)
420 gst_object_unref (priv->ulpfec_decoder);
422 for (i = 0; i < 2; i++) {
423 if (priv->socket_v4[i])
424 g_object_unref (priv->socket_v4[i]);
425 if (priv->socket_v6[i])
426 g_object_unref (priv->socket_v6[i]);
427 if (priv->mcast_socket_v4[i])
428 g_object_unref (priv->mcast_socket_v4[i]);
429 if (priv->mcast_socket_v6[i])
430 g_object_unref (priv->mcast_socket_v6[i]);
433 g_free (priv->multicast_iface);
434 g_list_free_full (priv->mcast_clients, (GDestroyNotify) free_mcast_client);
436 gst_object_unref (priv->payloader);
438 gst_object_unref (priv->srcpad);
440 gst_object_unref (priv->sinkpad);
441 g_free (priv->control);
442 g_mutex_clear (&priv->lock);
444 g_hash_table_unref (priv->keys);
445 g_hash_table_destroy (priv->ptmap);
447 g_mutex_clear (&priv->send_lock);
448 g_cond_clear (&priv->send_cond);
450 if (priv->block_early_rtcp_probe != 0) {
452 (priv->block_early_rtcp_pad, priv->block_early_rtcp_probe);
453 gst_object_unref (priv->block_early_rtcp_pad);
456 if (priv->block_early_rtcp_probe_ipv6 != 0) {
458 (priv->block_early_rtcp_pad_ipv6, priv->block_early_rtcp_probe_ipv6);
459 gst_object_unref (priv->block_early_rtcp_pad_ipv6);
462 G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
466 gst_rtsp_stream_get_property (GObject * object, guint propid,
467 GValue * value, GParamSpec * pspec)
469 GstRTSPStream *stream = GST_RTSP_STREAM (object);
473 g_value_take_string (value, gst_rtsp_stream_get_control (stream));
476 g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
479 g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
482 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
487 gst_rtsp_stream_set_property (GObject * object, guint propid,
488 const GValue * value, GParamSpec * pspec)
490 GstRTSPStream *stream = GST_RTSP_STREAM (object);
494 gst_rtsp_stream_set_control (stream, g_value_get_string (value));
497 gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
500 gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
503 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
508 * gst_rtsp_stream_new:
511 * @payloader: a #GstElement
513 * Create a new media stream with index @idx that handles RTP data on
514 * @pad and has a payloader element @payloader if @pad is a source pad
515 * or a depayloader element @payloader if @pad is a sink pad.
517 * Returns: (transfer full): a new #GstRTSPStream
520 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
522 GstRTSPStreamPrivate *priv;
523 GstRTSPStream *stream;
525 g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
526 g_return_val_if_fail (GST_IS_PAD (pad), NULL);
528 stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
531 priv->payloader = gst_object_ref (payloader);
532 if (GST_PAD_IS_SRC (pad))
533 priv->srcpad = gst_object_ref (pad);
535 priv->sinkpad = gst_object_ref (pad);
541 * gst_rtsp_stream_get_index:
542 * @stream: a #GstRTSPStream
544 * Get the stream index.
546 * Return: the stream index.
549 gst_rtsp_stream_get_index (GstRTSPStream * stream)
551 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
553 return stream->priv->idx;
557 * gst_rtsp_stream_get_pt:
558 * @stream: a #GstRTSPStream
560 * Get the stream payload type.
562 * Return: the stream payload type.
565 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
567 GstRTSPStreamPrivate *priv;
570 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
574 g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
580 * gst_rtsp_stream_get_srcpad:
581 * @stream: a #GstRTSPStream
583 * Get the srcpad associated with @stream.
585 * Returns: (transfer full) (nullable): the srcpad. Unref after usage.
588 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
590 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
592 if (!stream->priv->srcpad)
595 return gst_object_ref (stream->priv->srcpad);
599 * gst_rtsp_stream_get_sinkpad:
600 * @stream: a #GstRTSPStream
602 * Get the sinkpad associated with @stream.
604 * Returns: (transfer full) (nullable): the sinkpad. Unref after usage.
607 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
609 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
611 if (!stream->priv->sinkpad)
614 return gst_object_ref (stream->priv->sinkpad);
618 * gst_rtsp_stream_get_control:
619 * @stream: a #GstRTSPStream
621 * Get the control string to identify this stream.
623 * Returns: (transfer full) (nullable): the control string. g_free() after usage.
626 gst_rtsp_stream_get_control (GstRTSPStream * stream)
628 GstRTSPStreamPrivate *priv;
631 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
635 g_mutex_lock (&priv->lock);
636 if ((result = g_strdup (priv->control)) == NULL)
637 result = g_strdup_printf ("stream=%u", priv->idx);
638 g_mutex_unlock (&priv->lock);
644 * gst_rtsp_stream_set_control:
645 * @stream: a #GstRTSPStream
646 * @control: (nullable): a control string
648 * Set the control string in @stream.
651 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
653 GstRTSPStreamPrivate *priv;
655 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
659 g_mutex_lock (&priv->lock);
660 g_free (priv->control);
661 priv->control = g_strdup (control);
662 g_mutex_unlock (&priv->lock);
666 * gst_rtsp_stream_has_control:
667 * @stream: a #GstRTSPStream
668 * @control: (nullable): a control string
670 * Check if @stream has the control string @control.
672 * Returns: %TRUE is @stream has @control as the control string
675 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
677 GstRTSPStreamPrivate *priv;
680 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
684 g_mutex_lock (&priv->lock);
686 res = (g_strcmp0 (priv->control, control) == 0);
690 if (sscanf (control, "stream=%u", &streamid) > 0)
691 res = (streamid == priv->idx);
695 g_mutex_unlock (&priv->lock);
701 * gst_rtsp_stream_set_mtu:
702 * @stream: a #GstRTSPStream
705 * Configure the mtu in the payloader of @stream to @mtu.
708 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
710 GstRTSPStreamPrivate *priv;
712 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
716 GST_LOG_OBJECT (stream, "set MTU %u", mtu);
718 g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
722 * gst_rtsp_stream_get_mtu:
723 * @stream: a #GstRTSPStream
725 * Get the configured MTU in the payloader of @stream.
727 * Returns: the MTU of the payloader.
730 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
732 GstRTSPStreamPrivate *priv;
735 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
739 g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
744 /* Update the dscp qos property on the udp sinks */
746 update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink)
748 GstRTSPStreamPrivate *priv;
753 g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL);
758 * gst_rtsp_stream_set_dscp_qos:
759 * @stream: a #GstRTSPStream
760 * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
762 * Configure the dscp qos of the outgoing sockets to @dscp_qos.
765 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
767 GstRTSPStreamPrivate *priv;
769 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
773 GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
775 if (dscp_qos < -1 || dscp_qos > 63) {
776 GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
780 priv->dscp_qos = dscp_qos;
782 update_dscp_qos (stream, priv->udpsink);
786 * gst_rtsp_stream_get_dscp_qos:
787 * @stream: a #GstRTSPStream
789 * Get the configured DSCP QoS in of the outgoing sockets.
791 * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
794 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
796 GstRTSPStreamPrivate *priv;
798 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
802 return priv->dscp_qos;
806 * gst_rtsp_stream_is_transport_supported:
807 * @stream: a #GstRTSPStream
808 * @transport: (transfer none): a #GstRTSPTransport
810 * Check if @transport can be handled by stream
812 * Returns: %TRUE if @transport can be handled by @stream.
815 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
816 GstRTSPTransport * transport)
818 GstRTSPStreamPrivate *priv;
820 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
821 g_return_val_if_fail (transport != NULL, FALSE);
825 g_mutex_lock (&priv->lock);
826 if (transport->trans != GST_RTSP_TRANS_RTP)
827 goto unsupported_transmode;
829 if (!(transport->profile & priv->profiles))
830 goto unsupported_profile;
832 if (!(transport->lower_transport & priv->allowed_protocols))
833 goto unsupported_ltrans;
835 g_mutex_unlock (&priv->lock);
840 unsupported_transmode:
842 GST_DEBUG ("unsupported transport mode %d", transport->trans);
843 g_mutex_unlock (&priv->lock);
848 GST_DEBUG ("unsupported profile %d", transport->profile);
849 g_mutex_unlock (&priv->lock);
854 GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
855 g_mutex_unlock (&priv->lock);
861 * gst_rtsp_stream_set_profiles:
862 * @stream: a #GstRTSPStream
863 * @profiles: the new profiles
865 * Configure the allowed profiles for @stream.
868 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
870 GstRTSPStreamPrivate *priv;
872 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
876 g_mutex_lock (&priv->lock);
877 priv->profiles = profiles;
878 g_mutex_unlock (&priv->lock);
882 * gst_rtsp_stream_get_profiles:
883 * @stream: a #GstRTSPStream
885 * Get the allowed profiles of @stream.
887 * Returns: a #GstRTSPProfile
890 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
892 GstRTSPStreamPrivate *priv;
895 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
899 g_mutex_lock (&priv->lock);
900 res = priv->profiles;
901 g_mutex_unlock (&priv->lock);
907 * gst_rtsp_stream_set_protocols:
908 * @stream: a #GstRTSPStream
909 * @protocols: the new flags
911 * Configure the allowed lower transport for @stream.
914 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
915 GstRTSPLowerTrans protocols)
917 GstRTSPStreamPrivate *priv;
919 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
923 g_mutex_lock (&priv->lock);
924 priv->allowed_protocols = protocols;
925 g_mutex_unlock (&priv->lock);
929 * gst_rtsp_stream_get_protocols:
930 * @stream: a #GstRTSPStream
932 * Get the allowed protocols of @stream.
934 * Returns: a #GstRTSPLowerTrans
937 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
939 GstRTSPStreamPrivate *priv;
940 GstRTSPLowerTrans res;
942 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
943 GST_RTSP_LOWER_TRANS_UNKNOWN);
947 g_mutex_lock (&priv->lock);
948 res = priv->allowed_protocols;
949 g_mutex_unlock (&priv->lock);
955 * gst_rtsp_stream_set_address_pool:
956 * @stream: a #GstRTSPStream
957 * @pool: (transfer none) (nullable): a #GstRTSPAddressPool
959 * configure @pool to be used as the address pool of @stream.
962 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
963 GstRTSPAddressPool * pool)
965 GstRTSPStreamPrivate *priv;
966 GstRTSPAddressPool *old;
968 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
972 GST_LOG_OBJECT (stream, "set address pool %p", pool);
974 g_mutex_lock (&priv->lock);
975 if ((old = priv->pool) != pool)
976 priv->pool = pool ? g_object_ref (pool) : NULL;
979 g_mutex_unlock (&priv->lock);
982 g_object_unref (old);
986 * gst_rtsp_stream_get_address_pool:
987 * @stream: a #GstRTSPStream
989 * Get the #GstRTSPAddressPool used as the address pool of @stream.
991 * Returns: (transfer full) (nullable): the #GstRTSPAddressPool of @stream.
992 * g_object_unref() after usage.
995 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
997 GstRTSPStreamPrivate *priv;
998 GstRTSPAddressPool *result;
1000 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1002 priv = stream->priv;
1004 g_mutex_lock (&priv->lock);
1005 if ((result = priv->pool))
1006 g_object_ref (result);
1007 g_mutex_unlock (&priv->lock);
1013 * gst_rtsp_stream_set_multicast_iface:
1014 * @stream: a #GstRTSPStream
1015 * @multicast_iface: (transfer none) (nullable): a multicast interface name
1017 * configure @multicast_iface to be used for @stream.
1020 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
1021 const gchar * multicast_iface)
1023 GstRTSPStreamPrivate *priv;
1026 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1028 priv = stream->priv;
1030 GST_LOG_OBJECT (stream, "set multicast iface %s",
1031 GST_STR_NULL (multicast_iface));
1033 g_mutex_lock (&priv->lock);
1034 if ((old = priv->multicast_iface) != multicast_iface)
1035 priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
1038 g_mutex_unlock (&priv->lock);
1045 * gst_rtsp_stream_get_multicast_iface:
1046 * @stream: a #GstRTSPStream
1048 * Get the multicast interface used for @stream.
1050 * Returns: (transfer full) (nullable): the multicast interface for @stream.
1051 * g_free() after usage.
1054 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
1056 GstRTSPStreamPrivate *priv;
1059 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1061 priv = stream->priv;
1063 g_mutex_lock (&priv->lock);
1064 if ((result = priv->multicast_iface))
1065 result = g_strdup (result);
1066 g_mutex_unlock (&priv->lock);
1072 * gst_rtsp_stream_get_multicast_address:
1073 * @stream: a #GstRTSPStream
1074 * @family: the #GSocketFamily
1076 * Get the multicast address of @stream for @family. The original
1077 * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1078 * won't release the address from the pool.
1080 * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
1081 * or %NULL when no address could be allocated. gst_rtsp_address_free()
1085 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
1086 GSocketFamily family)
1088 GstRTSPStreamPrivate *priv;
1089 GstRTSPAddress *result;
1090 GstRTSPAddress **addrp;
1091 GstRTSPAddressFlags flags;
1093 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1095 priv = stream->priv;
1097 g_mutex_lock (&stream->priv->lock);
1099 if (family == G_SOCKET_FAMILY_IPV6) {
1100 flags = GST_RTSP_ADDRESS_FLAG_IPV6;
1101 addrp = &priv->mcast_addr_v6;
1103 flags = GST_RTSP_ADDRESS_FLAG_IPV4;
1104 addrp = &priv->mcast_addr_v4;
1107 if (*addrp == NULL) {
1108 if (priv->pool == NULL)
1111 flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
1113 *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
1117 /* FIXME: Also reserve the same port with unicast ANY address, since that's
1118 * where we are going to bind our socket. Probably loop until we find a port
1119 * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
1120 * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
1121 * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
1123 result = gst_rtsp_address_copy (*addrp);
1125 g_mutex_unlock (&stream->priv->lock);
1132 GST_ERROR_OBJECT (stream, "no address pool specified");
1133 g_mutex_unlock (&stream->priv->lock);
1138 GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1139 g_mutex_unlock (&stream->priv->lock);
1145 * gst_rtsp_stream_reserve_address:
1146 * @stream: a #GstRTSPStream
1147 * @address: an address
1152 * Reserve @address and @port as the address and port of @stream. The original
1153 * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1154 * won't release the address from the pool.
1156 * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1157 * the address could not be reserved. gst_rtsp_address_free() after
1161 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1162 const gchar * address, guint port, guint n_ports, guint ttl)
1164 GstRTSPStreamPrivate *priv;
1165 GstRTSPAddress *result;
1167 GSocketFamily family;
1168 GstRTSPAddress **addrp;
1170 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1171 g_return_val_if_fail (address != NULL, NULL);
1172 g_return_val_if_fail (port > 0, NULL);
1173 g_return_val_if_fail (n_ports > 0, NULL);
1174 g_return_val_if_fail (ttl > 0, NULL);
1176 priv = stream->priv;
1178 addr = g_inet_address_new_from_string (address);
1180 GST_ERROR ("failed to get inet addr from %s", address);
1181 family = G_SOCKET_FAMILY_IPV4;
1183 family = g_inet_address_get_family (addr);
1184 g_object_unref (addr);
1187 if (family == G_SOCKET_FAMILY_IPV6)
1188 addrp = &priv->mcast_addr_v6;
1190 addrp = &priv->mcast_addr_v4;
1192 g_mutex_lock (&priv->lock);
1193 if (*addrp == NULL) {
1194 GstRTSPAddressPoolResult res;
1196 if (priv->pool == NULL)
1199 res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1200 port, n_ports, ttl, addrp);
1201 if (res != GST_RTSP_ADDRESS_POOL_OK)
1204 /* FIXME: Also reserve the same port with unicast ANY address, since that's
1205 * where we are going to bind our socket. */
1207 if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1208 (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1209 (*addrp)->ttl != ttl)
1210 goto different_address;
1212 result = gst_rtsp_address_copy (*addrp);
1213 g_mutex_unlock (&priv->lock);
1220 GST_ERROR_OBJECT (stream, "no address pool specified");
1221 g_mutex_unlock (&priv->lock);
1226 GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1228 g_mutex_unlock (&priv->lock);
1233 GST_ERROR_OBJECT (stream,
1234 "address %s is not the same as %s that was already reserved",
1235 address, (*addrp)->address);
1236 g_mutex_unlock (&priv->lock);
1241 /* must be called with lock */
1243 set_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1244 GSocketFamily family)
1246 const gchar *multisink_socket;
1248 if (family == G_SOCKET_FAMILY_IPV6)
1249 multisink_socket = "socket-v6";
1251 multisink_socket = "socket";
1253 g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL);
1256 /* must be called with lock */
1258 set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1259 GSocketFamily family, const gchar * multicast_iface,
1260 const gchar * addr_str, gint port, gint mcast_ttl)
1262 set_socket_for_udpsink (udpsink, socket, family);
1264 if (multicast_iface) {
1265 GST_INFO ("setting multicast-iface %s", multicast_iface);
1266 g_object_set (G_OBJECT (udpsink), "multicast-iface", multicast_iface, NULL);
1269 if (mcast_ttl > 0) {
1270 GST_INFO ("setting ttl-mc %d", mcast_ttl);
1271 g_object_set (G_OBJECT (udpsink), "ttl-mc", mcast_ttl, NULL);
1276 /* must be called with lock */
1278 set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1279 GSocketFamily family)
1281 set_socket_for_udpsink (udpsink, socket, family);
1285 get_port_from_socket (GSocket * socket)
1288 GSocketAddress *sockaddr;
1291 GST_DEBUG ("socket: %p", socket);
1292 sockaddr = g_socket_get_local_address (socket, &err);
1293 if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) {
1294 g_clear_object (&sockaddr);
1295 GST_ERROR ("failed to get sockaddr: %s", err->message);
1300 port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr));
1301 g_object_unref (sockaddr);
1308 create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink,
1309 GSocket * socket_v4, GSocket * socket_v6, gboolean multicast,
1310 gboolean is_rtp, gint mcast_ttl)
1312 GstRTSPStreamPrivate *priv = stream->priv;
1314 *udpsink = gst_element_factory_make ("multiudpsink", NULL);
1317 goto no_udp_protocol;
1319 /* configure sinks */
1321 g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL);
1323 g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL);
1326 g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL);
1328 g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL);
1330 /* Needs to be async for RECORD streams, otherwise we will never go to
1331 * PLAYING because the sinks will wait for data while the udpsrc can't
1332 * provide data with timestamps in PAUSED. */
1333 if (!is_rtp || priv->sinkpad)
1334 g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL);
1337 /* join multicast group when adding clients, so we'll start receiving from it.
1338 * We cannot rely on the udpsrc to join the group since its socket is always a
1339 * local unicast one. */
1340 g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL);
1342 g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL);
1345 /* update the dscp qos field in the sinks */
1346 update_dscp_qos (stream, udpsink);
1348 if (priv->server_addr_v4) {
1349 GST_DEBUG_OBJECT (stream, "udp IPv4, configure udpsinks");
1350 set_unicast_socket_for_udpsink (*udpsink, socket_v4, G_SOCKET_FAMILY_IPV4);
1353 if (priv->server_addr_v6) {
1354 GST_DEBUG_OBJECT (stream, "udp IPv6, configure udpsinks");
1355 set_unicast_socket_for_udpsink (*udpsink, socket_v6, G_SOCKET_FAMILY_IPV6);
1360 if (priv->mcast_addr_v4) {
1361 GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks");
1362 port = get_port_from_socket (socket_v4);
1364 goto get_port_failed;
1365 set_multicast_socket_for_udpsink (*udpsink, socket_v4,
1366 G_SOCKET_FAMILY_IPV4, priv->multicast_iface,
1367 priv->mcast_addr_v4->address, port, mcast_ttl);
1370 if (priv->mcast_addr_v6) {
1371 GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks");
1372 port = get_port_from_socket (socket_v6);
1374 goto get_port_failed;
1375 set_multicast_socket_for_udpsink (*udpsink, socket_v6,
1376 G_SOCKET_FAMILY_IPV6, priv->multicast_iface,
1377 priv->mcast_addr_v6->address, port, mcast_ttl);
1387 GST_ERROR_OBJECT (stream, "failed to create udpsink element");
1392 GST_ERROR_OBJECT (stream, "failed to get udp port");
1397 /* must be called with lock */
1399 create_and_configure_udpsource (GstElement ** udpsrc, GSocket * socket)
1401 GstStateChangeReturn ret;
1403 g_assert (socket != NULL);
1405 *udpsrc = gst_element_factory_make ("udpsrc", NULL);
1406 if (*udpsrc == NULL)
1409 g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL);
1411 /* The udpsrc cannot do the join because its socket is always a local unicast
1412 * one. The udpsink sharing the same socket will do it for us. */
1413 g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL);
1415 g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL);
1417 g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL);
1419 ret = gst_element_set_state (*udpsrc, GST_STATE_READY);
1420 if (ret == GST_STATE_CHANGE_FAILURE)
1429 gst_element_set_state (*udpsrc, GST_STATE_NULL);
1430 g_clear_object (udpsrc);
1437 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1438 GSocket * socket_out[2], GstRTSPAddress ** server_addr_out,
1439 gboolean multicast, GstRTSPTransport * ct, gboolean use_transport_settings)
1441 GstRTSPStreamPrivate *priv = stream->priv;
1442 GSocket *rtp_socket = NULL;
1443 GSocket *rtcp_socket = NULL;
1444 gint tmp_rtp, tmp_rtcp;
1446 GList *rejected_addresses = NULL;
1447 GstRTSPAddress *addr = NULL;
1448 GInetAddress *inetaddr = NULL;
1449 GSocketAddress *rtp_sockaddr = NULL;
1450 GSocketAddress *rtcp_sockaddr = NULL;
1451 GstRTSPAddressPool *pool;
1452 gboolean transport_settings_defined = FALSE;
1457 /* Start with random port */
1461 if (use_transport_settings) {
1468 /* multicast and transport specific case */
1469 if (ct->destination != NULL) {
1470 tmp_rtp = ct->port.min;
1471 tmp_rtcp = ct->port.max;
1473 /* check if the provided address is a multicast address */
1474 inetaddr = g_inet_address_new_from_string (ct->destination);
1475 if (inetaddr == NULL)
1476 goto destination_error;
1477 if (!g_inet_address_get_is_multicast (inetaddr))
1478 goto destination_no_mcast;
1481 if (!priv->bind_mcast_address) {
1482 g_clear_object (&inetaddr);
1483 inetaddr = g_inet_address_new_any (family);
1486 GST_DEBUG_OBJECT (stream, "use transport settings");
1487 transport_settings_defined = TRUE;
1491 if (priv->enable_rtcp) {
1492 rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1493 G_SOCKET_PROTOCOL_UDP, NULL);
1495 goto no_udp_protocol;
1496 g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1499 /* try to allocate UDP ports, the RTP port should be an even
1500 * number and the RTCP port (if enabled) should be the next (uneven) port */
1503 if (rtp_socket == NULL) {
1504 rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1505 G_SOCKET_PROTOCOL_UDP, NULL);
1507 goto no_udp_protocol;
1508 g_socket_set_multicast_loopback (rtp_socket, FALSE);
1511 if (!transport_settings_defined) {
1512 if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool))
1514 GstRTSPAddressFlags flags;
1517 rejected_addresses = g_list_prepend (rejected_addresses, addr);
1522 flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1524 flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1526 flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1528 if (family == G_SOCKET_FAMILY_IPV6)
1529 flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1531 flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1533 if (*server_addr_out)
1534 addr = *server_addr_out;
1536 addr = gst_rtsp_address_pool_acquire_address (pool, flags,
1537 priv->enable_rtcp ? 2 : 1);
1542 tmp_rtp = addr->port;
1544 g_clear_object (&inetaddr);
1545 /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and
1546 * socket control message set in udpsrc? */
1547 if (priv->bind_mcast_address || !multicast)
1548 inetaddr = g_inet_address_new_from_string (addr->address);
1550 inetaddr = g_inet_address_new_any (family);
1558 if (inetaddr == NULL)
1559 inetaddr = g_inet_address_new_any (family);
1563 rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1564 if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1565 GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again");
1566 g_object_unref (rtp_sockaddr);
1567 if (transport_settings_defined)
1568 goto transport_settings_error;
1571 g_object_unref (rtp_sockaddr);
1573 rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1574 if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1575 g_clear_object (&rtp_sockaddr);
1579 if (!transport_settings_defined) {
1581 g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1583 /* check if port is even. RFC 3550 encorages the use of an even/odd port
1584 * pair, however it's not a strict requirement so this check is not done
1585 * for the client selected ports. */
1586 if ((tmp_rtp & 1) != 0) {
1587 /* port not even, close and allocate another */
1589 g_object_unref (rtp_sockaddr);
1590 g_clear_object (&rtp_socket);
1594 g_object_unref (rtp_sockaddr);
1597 if (priv->enable_rtcp) {
1598 tmp_rtcp = tmp_rtp + 1;
1600 rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1601 if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1602 GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again");
1603 g_object_unref (rtcp_sockaddr);
1604 g_clear_object (&rtp_socket);
1605 if (transport_settings_defined)
1606 goto transport_settings_error;
1609 g_object_unref (rtcp_sockaddr);
1613 addr = g_slice_new0 (GstRTSPAddress);
1614 addr->port = tmp_rtp;
1616 if (transport_settings_defined)
1617 addr->address = g_strdup (ct->destination);
1619 addr->address = g_inet_address_to_string (inetaddr);
1620 addr->ttl = ct->ttl;
1623 g_clear_object (&inetaddr);
1625 if (multicast && (ct->ttl > 0) && (ct->ttl <= priv->max_mcast_ttl)) {
1626 GST_DEBUG ("setting mcast ttl to %d", ct->ttl);
1627 g_socket_set_multicast_ttl (rtp_socket, ct->ttl);
1629 g_socket_set_multicast_ttl (rtcp_socket, ct->ttl);
1632 socket_out[0] = rtp_socket;
1633 socket_out[1] = rtcp_socket;
1634 *server_addr_out = addr;
1636 if (priv->enable_rtcp) {
1637 GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d",
1638 addr->address, tmp_rtp, tmp_rtcp);
1640 GST_DEBUG_OBJECT (stream, "allocated address: %s and port: %d",
1641 addr->address, tmp_rtp);
1644 g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1651 GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: wrong transport");
1656 GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no transport");
1661 GST_ERROR_OBJECT (stream,
1662 "failed to allocate UDP ports: destination error");
1665 destination_no_mcast:
1667 GST_ERROR_OBJECT (stream,
1668 "failed to allocate UDP ports: destination not multicast address");
1673 GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: protocol error");
1678 GST_WARNING_OBJECT (stream,
1679 "failed to allocate UDP ports: no address pool specified");
1684 GST_WARNING_OBJECT (stream, "failed to acquire address from pool");
1689 GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: no ports");
1692 transport_settings_error:
1694 GST_ERROR_OBJECT (stream,
1695 "failed to allocate UDP ports with requested transport settings");
1700 GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: socket error");
1706 g_object_unref (inetaddr);
1707 g_list_free_full (rejected_addresses,
1708 (GDestroyNotify) gst_rtsp_address_free);
1710 gst_rtsp_address_free (addr);
1712 g_object_unref (rtp_socket);
1714 g_object_unref (rtcp_socket);
1719 /* must be called with lock */
1721 add_mcast_client_addr (GstRTSPStream * stream, const gchar * destination,
1722 guint rtp_port, guint rtcp_port)
1724 GstRTSPStreamPrivate *priv;
1726 UdpClientAddrInfo *client;
1729 priv = stream->priv;
1731 if (destination == NULL)
1734 inet = g_inet_address_new_from_string (destination);
1736 goto invalid_address;
1738 if (!g_inet_address_get_is_multicast (inet)) {
1739 g_object_unref (inet);
1740 goto invalid_address;
1742 g_object_unref (inet);
1744 for (walk = priv->mcast_clients; walk; walk = g_list_next (walk)) {
1745 UdpClientAddrInfo *cli = walk->data;
1747 if ((g_strcmp0 (cli->address, destination) == 0) &&
1748 (cli->rtp_port == rtp_port)) {
1749 GST_DEBUG ("requested destination already exists: %s:%u-%u",
1750 destination, rtp_port, rtcp_port);
1756 client = g_new0 (UdpClientAddrInfo, 1);
1757 client->address = g_strdup (destination);
1758 client->rtp_port = rtp_port;
1759 client->add_count = 1;
1760 priv->mcast_clients = g_list_prepend (priv->mcast_clients, client);
1762 GST_DEBUG ("added mcast client %s:%u-%u", destination, rtp_port, rtcp_port);
1768 GST_WARNING_OBJECT (stream, "Multicast address is invalid: %s",
1774 /* must be called with lock */
1776 remove_mcast_client_addr (GstRTSPStream * stream, const gchar * destination,
1777 guint rtp_port, guint rtcp_port)
1779 GstRTSPStreamPrivate *priv;
1782 priv = stream->priv;
1784 if (destination == NULL)
1785 goto no_destination;
1787 for (walk = priv->mcast_clients; walk; walk = g_list_next (walk)) {
1788 UdpClientAddrInfo *cli = walk->data;
1790 if ((g_strcmp0 (cli->address, destination) == 0) &&
1791 (cli->rtp_port == rtp_port)) {
1794 if (!cli->add_count) {
1795 priv->mcast_clients = g_list_remove (priv->mcast_clients, cli);
1796 free_mcast_client (cli);
1802 GST_WARNING_OBJECT (stream, "Address not found");
1807 GST_WARNING_OBJECT (stream, "No destination has been provided");
1814 * gst_rtsp_stream_allocate_udp_sockets:
1815 * @stream: a #GstRTSPStream
1816 * @family: protocol family
1817 * @transport: transport method
1818 * @use_client_settings: Whether to use client settings or not
1820 * Allocates RTP and RTCP ports.
1822 * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1825 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1826 GSocketFamily family, GstRTSPTransport * ct,
1827 gboolean use_transport_settings)
1829 GstRTSPStreamPrivate *priv;
1830 gboolean ret = FALSE;
1831 GstRTSPLowerTrans transport;
1832 gboolean allocated = FALSE;
1834 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1835 g_return_val_if_fail (ct != NULL, FALSE);
1836 priv = stream->priv;
1838 transport = ct->lower_transport;
1840 g_mutex_lock (&priv->lock);
1842 if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1843 if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_socket_v4[0])
1845 else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_socket_v6[0])
1847 } else if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1848 if (family == G_SOCKET_FAMILY_IPV4 && priv->socket_v4[0])
1850 else if (family == G_SOCKET_FAMILY_IPV6 && priv->socket_v6[0])
1855 GST_DEBUG_OBJECT (stream, "Allocated already");
1856 g_mutex_unlock (&priv->lock);
1860 if (family == G_SOCKET_FAMILY_IPV4) {
1862 if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1864 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4");
1865 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1866 priv->socket_v4, &priv->server_addr_v4, FALSE, ct, FALSE);
1869 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4");
1870 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1871 priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct,
1872 use_transport_settings);
1876 if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1878 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6");
1879 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1880 priv->socket_v6, &priv->server_addr_v6, FALSE, ct, FALSE);
1884 GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6");
1885 ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1886 priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct,
1887 use_transport_settings);
1890 g_mutex_unlock (&priv->lock);
1896 * gst_rtsp_stream_set_client_side:
1897 * @stream: a #GstRTSPStream
1898 * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1899 * an RTSP connection.
1901 * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1902 * streams to an RTSP server via RECORD. This has the practical effect
1903 * of changing which UDP port numbers are used when setting up the local
1904 * side of the stream sending to be either the 'server' or 'client' pair
1905 * of a configured UDP transport.
1908 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1910 GstRTSPStreamPrivate *priv;
1912 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1913 priv = stream->priv;
1914 g_mutex_lock (&priv->lock);
1915 priv->client_side = client_side;
1916 g_mutex_unlock (&priv->lock);
1920 * gst_rtsp_stream_is_client_side:
1921 * @stream: a #GstRTSPStream
1923 * See gst_rtsp_stream_set_client_side()
1925 * Returns: TRUE if this #GstRTSPStream is client-side.
1928 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1930 GstRTSPStreamPrivate *priv;
1933 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1935 priv = stream->priv;
1936 g_mutex_lock (&priv->lock);
1937 ret = priv->client_side;
1938 g_mutex_unlock (&priv->lock);
1944 * gst_rtsp_stream_get_server_port:
1945 * @stream: a #GstRTSPStream
1946 * @server_port: (out): result server port
1947 * @family: the port family to get
1949 * Fill @server_port with the port pair used by the server. This function can
1950 * only be called when @stream has been joined.
1953 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1954 GstRTSPRange * server_port, GSocketFamily family)
1956 GstRTSPStreamPrivate *priv;
1958 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1959 priv = stream->priv;
1960 g_return_if_fail (priv->joined_bin != NULL);
1963 server_port->min = 0;
1964 server_port->max = 0;
1967 g_mutex_lock (&priv->lock);
1968 if (family == G_SOCKET_FAMILY_IPV4) {
1969 if (server_port && priv->server_addr_v4) {
1970 server_port->min = priv->server_addr_v4->port;
1971 if (priv->enable_rtcp) {
1973 priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1977 if (server_port && priv->server_addr_v6) {
1978 server_port->min = priv->server_addr_v6->port;
1979 if (priv->enable_rtcp) {
1981 priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1985 g_mutex_unlock (&priv->lock);
1989 * gst_rtsp_stream_get_rtpsession:
1990 * @stream: a #GstRTSPStream
1992 * Get the RTP session of this stream.
1994 * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1997 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1999 GstRTSPStreamPrivate *priv;
2002 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2004 priv = stream->priv;
2006 g_mutex_lock (&priv->lock);
2007 if ((session = priv->session))
2008 g_object_ref (session);
2009 g_mutex_unlock (&priv->lock);
2015 * gst_rtsp_stream_get_srtp_encoder:
2016 * @stream: a #GstRTSPStream
2018 * Get the SRTP encoder for this stream.
2020 * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
2023 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
2025 GstRTSPStreamPrivate *priv;
2026 GstElement *encoder;
2028 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2030 priv = stream->priv;
2032 g_mutex_lock (&priv->lock);
2033 if ((encoder = priv->srtpenc))
2034 g_object_ref (encoder);
2035 g_mutex_unlock (&priv->lock);
2041 * gst_rtsp_stream_get_ssrc:
2042 * @stream: a #GstRTSPStream
2043 * @ssrc: (out): result ssrc
2045 * Get the SSRC used by the RTP session of this stream. This function can only
2046 * be called when @stream has been joined.
2049 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
2051 GstRTSPStreamPrivate *priv;
2053 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2054 priv = stream->priv;
2055 g_return_if_fail (priv->joined_bin != NULL);
2057 g_mutex_lock (&priv->lock);
2058 if (ssrc && priv->session)
2059 g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
2060 g_mutex_unlock (&priv->lock);
2064 * gst_rtsp_stream_set_retransmission_time:
2065 * @stream: a #GstRTSPStream
2066 * @time: a #GstClockTime
2068 * Set the amount of time to store retransmission packets.
2071 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
2074 GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
2076 g_mutex_lock (&stream->priv->lock);
2077 stream->priv->rtx_time = time;
2078 if (stream->priv->rtxsend)
2079 g_object_set (stream->priv->rtxsend, "max-size-time",
2080 GST_TIME_AS_MSECONDS (time), NULL);
2081 g_mutex_unlock (&stream->priv->lock);
2085 * gst_rtsp_stream_get_retransmission_time:
2086 * @stream: a #GstRTSPStream
2088 * Get the amount of time to store retransmission data.
2090 * Returns: the amount of time to store retransmission data.
2093 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
2097 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2099 g_mutex_lock (&stream->priv->lock);
2100 ret = stream->priv->rtx_time;
2101 g_mutex_unlock (&stream->priv->lock);
2107 * gst_rtsp_stream_set_retransmission_pt:
2108 * @stream: a #GstRTSPStream
2111 * Set the payload type (pt) for retransmission of this stream.
2114 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
2116 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2118 GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
2120 g_mutex_lock (&stream->priv->lock);
2121 stream->priv->rtx_pt = rtx_pt;
2122 if (stream->priv->rtxsend) {
2123 guint pt = gst_rtsp_stream_get_pt (stream);
2124 gchar *pt_s = g_strdup_printf ("%d", pt);
2125 GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
2126 pt_s, G_TYPE_UINT, rtx_pt, NULL);
2127 g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
2129 gst_structure_free (rtx_pt_map);
2131 g_mutex_unlock (&stream->priv->lock);
2135 * gst_rtsp_stream_get_retransmission_pt:
2136 * @stream: a #GstRTSPStream
2138 * Get the payload-type used for retransmission of this stream
2140 * Returns: The retransmission PT.
2143 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
2147 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2149 g_mutex_lock (&stream->priv->lock);
2150 rtx_pt = stream->priv->rtx_pt;
2151 g_mutex_unlock (&stream->priv->lock);
2157 * gst_rtsp_stream_set_buffer_size:
2158 * @stream: a #GstRTSPStream
2159 * @size: the buffer size
2161 * Set the size of the UDP transmission buffer (in bytes)
2162 * Needs to be set before the stream is joined to a bin.
2167 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
2169 g_mutex_lock (&stream->priv->lock);
2170 stream->priv->buffer_size = size;
2171 g_mutex_unlock (&stream->priv->lock);
2175 * gst_rtsp_stream_get_buffer_size:
2176 * @stream: a #GstRTSPStream
2178 * Get the size of the UDP transmission buffer (in bytes)
2180 * Returns: the size of the UDP TX buffer
2185 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
2189 g_mutex_lock (&stream->priv->lock);
2190 buffer_size = stream->priv->buffer_size;
2191 g_mutex_unlock (&stream->priv->lock);
2197 * gst_rtsp_stream_set_max_mcast_ttl:
2198 * @stream: a #GstRTSPStream
2199 * @ttl: the new multicast ttl value
2201 * Set the maximum time-to-live value of outgoing multicast packets.
2203 * Returns: %TRUE if the requested ttl has been set successfully.
2208 gst_rtsp_stream_set_max_mcast_ttl (GstRTSPStream * stream, guint ttl)
2210 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2212 g_mutex_lock (&stream->priv->lock);
2213 if (ttl == 0 || ttl > DEFAULT_MAX_MCAST_TTL) {
2214 GST_WARNING_OBJECT (stream, "The reqested mcast TTL value is not valid.");
2215 g_mutex_unlock (&stream->priv->lock);
2218 stream->priv->max_mcast_ttl = ttl;
2219 g_mutex_unlock (&stream->priv->lock);
2225 * gst_rtsp_stream_get_max_mcast_ttl:
2226 * @stream: a #GstRTSPStream
2228 * Get the the maximum time-to-live value of outgoing multicast packets.
2230 * Returns: the maximum time-to-live value of outgoing multicast packets.
2235 gst_rtsp_stream_get_max_mcast_ttl (GstRTSPStream * stream)
2239 g_mutex_lock (&stream->priv->lock);
2240 ttl = stream->priv->max_mcast_ttl;
2241 g_mutex_unlock (&stream->priv->lock);
2247 * gst_rtsp_stream_verify_mcast_ttl:
2248 * @stream: a #GstRTSPStream
2249 * @ttl: a requested multicast ttl
2251 * Check if the requested multicast ttl value is allowed.
2253 * Returns: TRUE if the requested ttl value is allowed.
2258 gst_rtsp_stream_verify_mcast_ttl (GstRTSPStream * stream, guint ttl)
2260 gboolean res = FALSE;
2262 g_mutex_lock (&stream->priv->lock);
2263 if ((ttl > 0) && (ttl <= stream->priv->max_mcast_ttl))
2265 g_mutex_unlock (&stream->priv->lock);
2271 * gst_rtsp_stream_set_bind_mcast_address:
2272 * @stream: a #GstRTSPStream,
2273 * @bind_mcast_addr: the new value
2275 * Decide whether the multicast socket should be bound to a multicast address or
2281 gst_rtsp_stream_set_bind_mcast_address (GstRTSPStream * stream,
2282 gboolean bind_mcast_addr)
2284 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2286 g_mutex_lock (&stream->priv->lock);
2287 stream->priv->bind_mcast_address = bind_mcast_addr;
2288 g_mutex_unlock (&stream->priv->lock);
2292 * gst_rtsp_stream_is_bind_mcast_address:
2293 * @stream: a #GstRTSPStream
2295 * Check if multicast sockets are configured to be bound to multicast addresses.
2297 * Returns: %TRUE if multicast sockets are configured to be bound to multicast addresses.
2302 gst_rtsp_stream_is_bind_mcast_address (GstRTSPStream * stream)
2306 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2308 g_mutex_lock (&stream->priv->lock);
2309 result = stream->priv->bind_mcast_address;
2310 g_mutex_unlock (&stream->priv->lock);
2316 gst_rtsp_stream_set_enable_rtcp (GstRTSPStream * stream, gboolean enable)
2318 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2320 g_mutex_lock (&stream->priv->lock);
2321 stream->priv->enable_rtcp = enable;
2322 g_mutex_unlock (&stream->priv->lock);
2325 /* executed from streaming thread */
2327 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
2329 GstRTSPStreamPrivate *priv = stream->priv;
2330 GstCaps *newcaps, *oldcaps;
2332 newcaps = gst_pad_get_current_caps (pad);
2334 GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
2337 g_mutex_lock (&priv->lock);
2338 oldcaps = priv->caps;
2339 priv->caps = newcaps;
2340 g_mutex_unlock (&priv->lock);
2343 gst_caps_unref (oldcaps);
2347 dump_structure (const GstStructure * s)
2351 sstr = gst_structure_to_string (s);
2352 GST_INFO ("structure: %s", sstr);
2356 static GstRTSPStreamTransport *
2357 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
2359 GstRTSPStreamPrivate *priv = stream->priv;
2361 GstRTSPStreamTransport *result = NULL;
2366 if (rtcp_from == NULL)
2369 tmp = g_strrstr (rtcp_from, ":");
2373 port = atoi (tmp + 1);
2374 dest = g_strndup (rtcp_from, tmp - rtcp_from);
2376 g_mutex_lock (&priv->lock);
2377 GST_INFO ("finding %s:%d in %d transports", dest, port,
2378 g_list_length (priv->transports));
2380 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2381 GstRTSPStreamTransport *trans = walk->data;
2382 const GstRTSPTransport *tr;
2385 tr = gst_rtsp_stream_transport_get_transport (trans);
2387 if (priv->client_side) {
2388 /* In client side mode the 'destination' is the RTSP server, so send
2390 min = tr->server_port.min;
2391 max = tr->server_port.max;
2393 min = tr->client_port.min;
2394 max = tr->client_port.max;
2397 if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
2398 (min == port || max == port)) {
2404 g_object_ref (result);
2405 g_mutex_unlock (&priv->lock);
2412 static GstRTSPStreamTransport *
2413 check_transport (GObject * source, GstRTSPStream * stream)
2415 GstStructure *stats;
2416 GstRTSPStreamTransport *trans;
2418 /* see if we have a stream to match with the origin of the RTCP packet */
2419 trans = g_object_get_qdata (source, ssrc_stream_map_key);
2420 if (trans == NULL) {
2421 g_object_get (source, "stats", &stats, NULL);
2423 const gchar *rtcp_from;
2425 dump_structure (stats);
2427 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_RTCP_STATS], 0,
2430 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
2431 if ((trans = find_transport (stream, rtcp_from))) {
2432 GST_INFO ("%p: found transport %p for source %p", stream, trans,
2434 g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
2437 gst_structure_free (stats);
2445 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2447 GstRTSPStreamTransport *trans;
2449 GST_INFO ("%p: new source %p", stream, source);
2451 trans = check_transport (source, stream);
2454 GST_INFO ("%p: source %p for transport %p", stream, source, trans);
2458 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
2460 GST_INFO ("%p: new SDES %p", stream, source);
2464 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
2466 GstRTSPStreamTransport *trans;
2468 trans = check_transport (source, stream);
2471 GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
2472 gst_rtsp_stream_transport_keep_alive (trans);
2476 GstStructure *stats;
2477 g_object_get (source, "stats", &stats, NULL);
2479 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_RTCP_STATS], 0,
2482 dump_structure (stats);
2483 gst_structure_free (stats);
2490 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2492 GST_INFO ("%p: source %p bye", stream, source);
2496 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2498 GstRTSPStreamTransport *trans;
2500 GST_INFO ("%p: source %p bye timeout", stream, source);
2502 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2503 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2504 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2509 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2511 GstRTSPStreamTransport *trans;
2513 GST_INFO ("%p: source %p timeout", stream, source);
2515 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2516 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2517 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2522 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2524 GST_INFO ("%p: new sender source %p", stream, source);
2527 GstStructure *stats;
2528 g_object_get (source, "stats", &stats, NULL);
2530 dump_structure (stats);
2531 gst_structure_free (stats);
2538 on_sender_ssrc_active (GObject * session, GObject * source,
2539 GstRTSPStream * stream)
2543 GstStructure *stats;
2544 g_object_get (source, "stats", &stats, NULL);
2546 dump_structure (stats);
2547 gst_structure_free (stats);
2554 clear_tr_cache (GstRTSPStreamPrivate * priv)
2557 g_ptr_array_unref (priv->tr_cache);
2558 priv->tr_cache = NULL;
2561 /* With lock taken */
2563 any_transport_ready (GstRTSPStream * stream, gboolean is_rtp)
2565 gboolean ret = TRUE;
2566 GstRTSPStreamPrivate *priv = stream->priv;
2567 GPtrArray *transports;
2570 transports = priv->tr_cache;
2575 for (index = 0; index < transports->len; index++) {
2576 GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
2577 if (!gst_rtsp_stream_transport_check_back_pressure (tr, is_rtp)) {
2589 /* Must be called *without* priv->lock */
2591 push_data (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2592 GstBuffer * buffer, GstBufferList * buffer_list, gboolean is_rtp)
2594 gboolean send_ret = TRUE;
2598 send_ret = gst_rtsp_stream_transport_send_rtp (trans, buffer);
2600 send_ret = gst_rtsp_stream_transport_send_rtp_list (trans, buffer_list);
2603 send_ret = gst_rtsp_stream_transport_send_rtcp (trans, buffer);
2605 send_ret = gst_rtsp_stream_transport_send_rtcp_list (trans, buffer_list);
2611 /* With priv->lock */
2613 ensure_cached_transports (GstRTSPStream * stream)
2615 GstRTSPStreamPrivate *priv = stream->priv;
2618 if (priv->tr_cache_cookie != priv->transports_cookie) {
2619 clear_tr_cache (priv);
2621 g_ptr_array_new_full (priv->n_tcp_transports, g_object_unref);
2623 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2624 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2625 const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
2627 if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
2630 g_ptr_array_add (priv->tr_cache, g_object_ref (tr));
2632 priv->tr_cache_cookie = priv->transports_cookie;
2636 /* Must be called *without* priv->lock */
2638 check_transport_backlog (GstRTSPStream * stream, GstRTSPStreamTransport * trans)
2640 GstRTSPStreamPrivate *priv = stream->priv;
2641 gboolean send_ret = TRUE;
2643 gst_rtsp_stream_transport_lock_backlog (trans);
2645 if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) {
2647 GstBufferList *buffer_list;
2652 gst_rtsp_stream_transport_backlog_pop (trans, &buffer, &buffer_list,
2655 g_assert (popped == TRUE);
2657 send_ret = push_data (stream, trans, buffer, buffer_list, is_rtp);
2659 gst_clear_buffer (&buffer);
2660 gst_clear_buffer_list (&buffer_list);
2663 gst_rtsp_stream_transport_unlock_backlog (trans);
2666 /* remove transport on send error */
2667 g_mutex_lock (&priv->lock);
2668 update_transport (stream, trans, FALSE);
2669 g_mutex_unlock (&priv->lock);
2673 /* Must be called with priv->lock */
2675 send_tcp_message (GstRTSPStream * stream, gint idx)
2677 GstRTSPStreamPrivate *priv = stream->priv;
2681 GstBufferList *buffer_list;
2682 guint n_messages = 0;
2684 GPtrArray *transports;
2686 if (!priv->have_buffer[idx])
2689 ensure_cached_transports (stream);
2691 is_rtp = (idx == 0);
2693 if (!any_transport_ready (stream, is_rtp))
2696 priv->have_buffer[idx] = FALSE;
2698 if (priv->appsink[idx] == NULL) {
2699 /* session expired */
2703 sink = GST_APP_SINK (priv->appsink[idx]);
2704 sample = gst_app_sink_pull_sample (sink);
2709 buffer = gst_sample_get_buffer (sample);
2710 buffer_list = gst_sample_get_buffer_list (sample);
2712 /* We will get one message-sent notification per buffer or
2713 * complete buffer-list. We handle each buffer-list as a unit */
2719 transports = priv->tr_cache;
2721 g_ptr_array_ref (transports);
2726 for (index = 0; index < transports->len; index++) {
2727 GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
2728 GstBuffer *buf_ref = NULL;
2729 GstBufferList *buflist_ref = NULL;
2731 gst_rtsp_stream_transport_lock_backlog (tr);
2734 buf_ref = gst_buffer_ref (buffer);
2736 buflist_ref = gst_buffer_list_ref (buffer_list);
2738 if (!gst_rtsp_stream_transport_backlog_push (tr,
2739 buf_ref, buflist_ref, is_rtp)) {
2740 GST_ERROR_OBJECT (stream,
2741 "Dropping slow transport %" GST_PTR_FORMAT, tr);
2742 update_transport (stream, tr, FALSE);
2745 gst_rtsp_stream_transport_unlock_backlog (tr);
2748 gst_sample_unref (sample);
2750 g_mutex_unlock (&priv->lock);
2755 for (index = 0; index < transports->len; index++) {
2756 GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
2758 check_transport_backlog (stream, tr);
2760 g_ptr_array_unref (transports);
2763 g_mutex_lock (&priv->lock);
2767 send_func (GstRTSPStream * stream)
2769 GstRTSPStreamPrivate *priv = stream->priv;
2771 g_mutex_lock (&priv->send_lock);
2773 while (priv->continue_sending) {
2778 cookie = priv->send_cookie;
2779 g_mutex_unlock (&priv->send_lock);
2781 g_mutex_lock (&priv->lock);
2783 /* iterate from 1 and down, so we prioritize RTCP over RTP */
2784 for (i = 1; i >= 0; i--) {
2785 if (priv->have_buffer[i]) {
2793 send_tcp_message (stream, idx);
2796 g_mutex_unlock (&priv->lock);
2798 g_mutex_lock (&priv->send_lock);
2799 while (cookie == priv->send_cookie && priv->continue_sending) {
2800 g_cond_wait (&priv->send_cond, &priv->send_lock);
2804 g_mutex_unlock (&priv->send_lock);
2809 static GstFlowReturn
2810 handle_new_sample (GstAppSink * sink, gpointer user_data)
2812 GstRTSPStream *stream = user_data;
2813 GstRTSPStreamPrivate *priv = stream->priv;
2816 g_mutex_lock (&priv->lock);
2818 for (i = 0; i < 2; i++) {
2819 if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
2820 priv->have_buffer[i] = TRUE;
2825 if (priv->send_thread == NULL) {
2826 priv->send_thread = g_thread_new (NULL, (GThreadFunc) send_func, user_data);
2829 g_mutex_unlock (&priv->lock);
2831 g_mutex_lock (&priv->send_lock);
2832 priv->send_cookie++;
2833 g_cond_signal (&priv->send_cond);
2834 g_mutex_unlock (&priv->send_lock);
2839 static GstAppSinkCallbacks sink_cb = {
2840 NULL, /* not interested in EOS */
2841 NULL, /* not interested in preroll samples */
2846 get_rtp_encoder (GstRTSPStream * stream, guint session)
2848 GstRTSPStreamPrivate *priv = stream->priv;
2850 if (priv->srtpenc == NULL) {
2853 name = g_strdup_printf ("srtpenc_%u", session);
2854 priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2857 g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2859 return gst_object_ref (priv->srtpenc);
2863 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2865 GstRTSPStreamPrivate *priv = stream->priv;
2866 GstElement *oldenc, *enc;
2870 if (priv->idx != session)
2873 GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2875 oldenc = priv->srtpenc;
2876 enc = get_rtp_encoder (stream, session);
2877 name = g_strdup_printf ("rtp_sink_%d", session);
2878 pad = gst_element_request_pad_simple (enc, name);
2880 gst_object_unref (pad);
2883 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2890 request_rtcp_encoder (GstElement * rtpbin, guint session,
2891 GstRTSPStream * stream)
2893 GstRTSPStreamPrivate *priv = stream->priv;
2894 GstElement *oldenc, *enc;
2898 if (priv->idx != session)
2901 GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2903 oldenc = priv->srtpenc;
2904 enc = get_rtp_encoder (stream, session);
2905 name = g_strdup_printf ("rtcp_sink_%d", session);
2906 pad = gst_element_request_pad_simple (enc, name);
2908 gst_object_unref (pad);
2911 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2918 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2920 GstRTSPStreamPrivate *priv = stream->priv;
2923 GST_DEBUG ("request key %08x", ssrc);
2925 g_mutex_lock (&priv->lock);
2926 if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2927 gst_caps_ref (caps);
2928 g_mutex_unlock (&priv->lock);
2934 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2935 GstRTSPStream * stream)
2937 GstRTSPStreamPrivate *priv = stream->priv;
2939 if (priv->idx != session)
2942 if (priv->srtpdec == NULL) {
2945 name = g_strdup_printf ("srtpdec_%u", session);
2946 priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2949 g_signal_connect (priv->srtpdec, "request-key",
2950 (GCallback) request_key, stream);
2952 g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_RTCP_DECODER],
2956 return gst_object_ref (priv->srtpdec);
2960 * gst_rtsp_stream_request_aux_sender:
2961 * @stream: a #GstRTSPStream
2962 * @sessid: the session id
2964 * Creating a rtxsend bin
2966 * Returns: (transfer full) (nullable): a #GstElement.
2971 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2975 GstStructure *pt_map;
2980 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2982 pt = gst_rtsp_stream_get_pt (stream);
2983 pt_s = g_strdup_printf ("%u", pt);
2984 rtx_pt = stream->priv->rtx_pt;
2986 GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2988 bin = gst_bin_new (NULL);
2989 stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2990 pt_map = gst_structure_new ("application/x-rtp-pt-map",
2991 pt_s, G_TYPE_UINT, rtx_pt, NULL);
2992 g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2993 "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2995 gst_structure_free (pt_map);
2996 gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2998 pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2999 name = g_strdup_printf ("src_%u", sessid);
3000 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
3002 gst_object_unref (pad);
3004 pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
3005 name = g_strdup_printf ("sink_%u", sessid);
3006 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
3008 gst_object_unref (pad);
3014 add_rtx_pt (gpointer key, GstCaps * caps, GstStructure * pt_map)
3016 guint pt = GPOINTER_TO_INT (key);
3017 const GstStructure *s = gst_caps_get_structure (caps, 0);
3020 if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "RTX") &&
3021 (apt = gst_structure_get_string (s, "apt"))) {
3022 gst_structure_set (pt_map, apt, G_TYPE_UINT, pt, NULL);
3026 /* Call with priv->lock taken */
3028 update_rtx_receive_pt_map (GstRTSPStream * stream)
3030 GstStructure *pt_map;
3032 if (!stream->priv->rtxreceive)
3035 pt_map = gst_structure_new_empty ("application/x-rtp-pt-map");
3036 g_hash_table_foreach (stream->priv->ptmap, (GHFunc) add_rtx_pt, pt_map);
3037 g_object_set (stream->priv->rtxreceive, "payload-type-map", pt_map, NULL);
3038 gst_structure_free (pt_map);
3045 retrieve_ulpfec_pt (gpointer key, GstCaps * caps, GstElement * ulpfec_decoder)
3047 guint pt = GPOINTER_TO_INT (key);
3048 const GstStructure *s = gst_caps_get_structure (caps, 0);
3050 if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
3051 g_object_set (ulpfec_decoder, "pt", pt, NULL);
3055 update_ulpfec_decoder_pt (GstRTSPStream * stream)
3057 if (!stream->priv->ulpfec_decoder)
3060 g_hash_table_foreach (stream->priv->ptmap, (GHFunc) retrieve_ulpfec_pt,
3061 stream->priv->ulpfec_decoder);
3068 * gst_rtsp_stream_request_aux_receiver:
3069 * @stream: a #GstRTSPStream
3070 * @sessid: the session id
3072 * Creating a rtxreceive bin
3074 * Returns: (transfer full) (nullable): a #GstElement.
3079 gst_rtsp_stream_request_aux_receiver (GstRTSPStream * stream, guint sessid)
3085 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3087 bin = gst_bin_new (NULL);
3088 stream->priv->rtxreceive = gst_element_factory_make ("rtprtxreceive", NULL);
3089 update_rtx_receive_pt_map (stream);
3090 update_ulpfec_decoder_pt (stream);
3091 gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxreceive));
3093 pad = gst_element_get_static_pad (stream->priv->rtxreceive, "src");
3094 name = g_strdup_printf ("src_%u", sessid);
3095 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
3097 gst_object_unref (pad);
3099 pad = gst_element_get_static_pad (stream->priv->rtxreceive, "sink");
3100 name = g_strdup_printf ("sink_%u", sessid);
3101 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
3103 gst_object_unref (pad);
3109 * gst_rtsp_stream_set_pt_map:
3110 * @stream: a #GstRTSPStream
3114 * Configure a pt map between @pt and @caps.
3117 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
3119 GstRTSPStreamPrivate *priv = stream->priv;
3121 if (!GST_IS_CAPS (caps))
3124 g_mutex_lock (&priv->lock);
3125 g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
3126 update_rtx_receive_pt_map (stream);
3127 g_mutex_unlock (&priv->lock);
3131 * gst_rtsp_stream_set_publish_clock_mode:
3132 * @stream: a #GstRTSPStream
3133 * @mode: the clock publish mode
3135 * Sets if and how the stream clock should be published according to RFC7273.
3140 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
3141 GstRTSPPublishClockMode mode)
3143 GstRTSPStreamPrivate *priv;
3145 priv = stream->priv;
3146 g_mutex_lock (&priv->lock);
3147 priv->publish_clock_mode = mode;
3148 g_mutex_unlock (&priv->lock);
3152 * gst_rtsp_stream_get_publish_clock_mode:
3153 * @stream: a #GstRTSPStream
3155 * Gets if and how the stream clock should be published according to RFC7273.
3157 * Returns: The GstRTSPPublishClockMode
3161 GstRTSPPublishClockMode
3162 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
3164 GstRTSPStreamPrivate *priv;
3165 GstRTSPPublishClockMode ret;
3167 priv = stream->priv;
3168 g_mutex_lock (&priv->lock);
3169 ret = priv->publish_clock_mode;
3170 g_mutex_unlock (&priv->lock);
3176 request_pt_map (GstElement * rtpbin, guint session, guint pt,
3177 GstRTSPStream * stream)
3179 GstRTSPStreamPrivate *priv = stream->priv;
3180 GstCaps *caps = NULL;
3182 g_mutex_lock (&priv->lock);
3184 if (priv->idx == session) {
3185 caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
3187 GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
3188 gst_caps_ref (caps);
3190 GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
3194 g_mutex_unlock (&priv->lock);
3200 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
3202 GstRTSPStreamPrivate *priv = stream->priv;
3204 GstPadLinkReturn ret;
3207 GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
3208 GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
3210 name = gst_pad_get_name (pad);
3211 if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
3217 if (priv->idx != sessid)
3220 if (gst_pad_is_linked (priv->sinkpad)) {
3221 GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
3222 GST_DEBUG_PAD_NAME (priv->sinkpad));
3226 /* link the RTP pad to the session manager, it should not really fail unless
3227 * this is not really an RTP pad */
3228 ret = gst_pad_link (pad, priv->sinkpad);
3229 if (ret != GST_PAD_LINK_OK)
3231 priv->recv_rtp_src = gst_object_ref (pad);
3238 GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
3239 GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
3244 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
3245 GstRTSPStream * stream)
3247 /* TODO: What to do here other than this? */
3248 GST_DEBUG ("Stream %p: Got EOS", stream);
3249 gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
3252 typedef struct _ProbeData ProbeData;
3256 GstRTSPStream *stream;
3257 /* existing sink, already linked to tee */
3259 /* new sink, about to be linked */
3261 /* new queue element, that will be linked to tee and sink1 */
3262 GstElement **queue1;
3263 /* new queue element, that will be linked to tee and sink2 */
3264 GstElement **queue2;
3271 free_cb_data (gpointer user_data)
3273 ProbeData *data = user_data;
3275 gst_object_unref (data->stream);
3276 gst_object_unref (data->sink1);
3277 gst_object_unref (data->sink2);
3278 gst_object_unref (data->sink_pad);
3279 gst_object_unref (data->tee_pad);
3285 create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream,
3286 GstElement * tee, GstElement * sink, GstElement ** queue)
3288 GstRTSPStreamPrivate *priv = stream->priv;
3293 /* create queue for the new stream */
3294 *queue = gst_element_factory_make ("queue", NULL);
3295 g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0,
3296 "max-size-time", G_GINT64_CONSTANT (0), NULL);
3297 gst_bin_add (priv->joined_bin, *queue);
3299 /* link tee to queue */
3300 tee_pad = gst_element_request_pad_simple (tee, "src_%u");
3301 queue_pad = gst_element_get_static_pad (*queue, "sink");
3302 gst_pad_link (tee_pad, queue_pad);
3303 gst_object_unref (queue_pad);
3304 gst_object_unref (tee_pad);
3306 /* link queue to sink */
3307 queue_pad = gst_element_get_static_pad (*queue, "src");
3308 sink_pad = gst_element_get_static_pad (sink, "sink");
3309 gst_pad_link (queue_pad, sink_pad);
3310 gst_object_unref (queue_pad);
3311 gst_object_unref (sink_pad);
3313 gst_element_sync_state_with_parent (sink);
3314 gst_element_sync_state_with_parent (*queue);
3317 static GstPadProbeReturn
3318 create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad,
3319 GstPadProbeInfo * info, gpointer user_data)
3321 GstRTSPStreamPrivate *priv;
3322 ProbeData *data = user_data;
3323 GstRTSPStream *stream;
3324 GstElement **queue1;
3325 GstElement **queue2;
3331 stream = data->stream;
3332 priv = stream->priv;
3333 queue1 = data->queue1;
3334 queue2 = data->queue2;
3335 sink_pad = data->sink_pad;
3336 tee_pad = data->tee_pad;
3337 index = data->index;
3339 /* unlink tee and the existing sink:
3340 * .-----. .---------.
3343 * '-----' '---------'
3345 g_assert (gst_pad_unlink (tee_pad, sink_pad));
3347 /* add queue to the already existing stream */
3348 *queue1 = gst_element_factory_make ("queue", NULL);
3349 g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0,
3350 "max-size-time", G_GINT64_CONSTANT (0), NULL);
3351 gst_bin_add (priv->joined_bin, *queue1);
3353 /* link tee, queue and sink:
3354 * .-----. .---------. .---------.
3355 * | tee | | queue1 | | sink1 |
3356 * sink src->sink src->sink |
3357 * '-----' '---------' '---------'
3359 queue_pad = gst_element_get_static_pad (*queue1, "sink");
3360 gst_pad_link (tee_pad, queue_pad);
3361 gst_object_unref (queue_pad);
3362 queue_pad = gst_element_get_static_pad (*queue1, "src");
3363 gst_pad_link (queue_pad, sink_pad);
3364 gst_object_unref (queue_pad);
3366 gst_element_sync_state_with_parent (*queue1);
3368 /* create queue and link it to tee and the new sink */
3369 create_and_plug_queue_to_unlinked_stream (stream,
3370 priv->tee[index], data->sink2, queue2);
3372 /* the final stream:
3374 * .-----. .---------. .---------.
3375 * | tee | | queue1 | | sink1 |
3376 * sink src->sink src->sink |
3377 * | | '---------' '---------'
3378 * | | .---------. .---------.
3379 * | | | queue2 | | sink2 |
3380 * | src->sink src->sink |
3381 * '-----' '---------' '---------'
3384 return GST_PAD_PROBE_REMOVE;
3388 create_and_plug_queue_to_linked_stream (GstRTSPStream * stream,
3389 GstElement * sink1, GstElement * sink2, guint index, GstElement ** queue1,
3390 GstElement ** queue2)
3394 data = g_new0 (ProbeData, 1);
3395 data->stream = gst_object_ref (stream);
3396 data->sink1 = gst_object_ref (sink1);
3397 data->sink2 = gst_object_ref (sink2);
3398 data->queue1 = queue1;
3399 data->queue2 = queue2;
3400 data->index = index;
3402 data->sink_pad = gst_element_get_static_pad (sink1, "sink");
3403 g_assert (data->sink_pad);
3404 data->tee_pad = gst_pad_get_peer (data->sink_pad);
3405 g_assert (data->tee_pad);
3407 gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE,
3408 create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data);
3412 plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug,
3413 GstElement ** queue_to_plug, guint index, gboolean is_mcast)
3415 GstRTSPStreamPrivate *priv = stream->priv;
3416 GstElement *existing_sink;
3419 existing_sink = priv->udpsink[index];
3421 existing_sink = priv->mcast_udpsink[index];
3423 GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp");
3425 /* add sink to the bin */
3426 gst_bin_add (priv->joined_bin, sink_to_plug);
3428 if (priv->appsink[index] && existing_sink) {
3430 /* queues are already added for the existing stream, add one for
3431 the newly added udp stream */
3432 create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index],
3433 sink_to_plug, queue_to_plug);
3435 } else if (priv->appsink[index] || existing_sink) {
3437 GstElement *element;
3439 /* add queue to the already existing stream plus the newly created udp
3441 if (priv->appsink[index]) {
3442 element = priv->appsink[index];
3443 queue = &priv->appqueue[index];
3445 element = existing_sink;
3447 queue = &priv->udpqueue[index];
3449 queue = &priv->mcast_udpqueue[index];
3452 create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug,
3453 index, queue, queue_to_plug);
3459 GST_DEBUG_OBJECT (stream, "creating first stream");
3461 /* no need to add queues */
3462 tee_pad = gst_element_request_pad_simple (priv->tee[index], "src_%u");
3463 sink_pad = gst_element_get_static_pad (sink_to_plug, "sink");
3464 gst_pad_link (tee_pad, sink_pad);
3465 gst_object_unref (tee_pad);
3466 gst_object_unref (sink_pad);
3469 gst_element_sync_state_with_parent (sink_to_plug);
3473 plug_tcp_sink (GstRTSPStream * stream, guint index)
3475 GstRTSPStreamPrivate *priv = stream->priv;
3477 GST_DEBUG_OBJECT (stream, "plug tcp sink");
3479 /* add sink to the bin */
3480 gst_bin_add (priv->joined_bin, priv->appsink[index]);
3482 if (priv->mcast_udpsink[index] && priv->udpsink[index]) {
3484 /* queues are already added for the existing stream, add one for
3485 the newly added tcp stream */
3486 create_and_plug_queue_to_unlinked_stream (stream,
3487 priv->tee[index], priv->appsink[index], &priv->appqueue[index]);
3489 } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) {
3491 GstElement *element;
3493 /* add queue to the already existing stream plus the newly created tcp
3495 if (priv->mcast_udpsink[index]) {
3496 element = priv->mcast_udpsink[index];
3497 queue = &priv->mcast_udpqueue[index];
3499 element = priv->udpsink[index];
3500 queue = &priv->udpqueue[index];
3503 create_and_plug_queue_to_linked_stream (stream, element,
3504 priv->appsink[index], index, queue, &priv->appqueue[index]);
3510 /* no need to add queues */
3511 tee_pad = gst_element_request_pad_simple (priv->tee[index], "src_%u");
3512 sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink");
3513 gst_pad_link (tee_pad, sink_pad);
3514 gst_object_unref (tee_pad);
3515 gst_object_unref (sink_pad);
3518 gst_element_sync_state_with_parent (priv->appsink[index]);
3522 plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport,
3525 GstRTSPStreamPrivate *priv;
3526 gboolean is_tcp, is_udp, is_mcast;
3527 priv = stream->priv;
3529 is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3530 is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3531 is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3534 plug_udp_sink (stream, priv->udpsink[index],
3535 &priv->udpqueue[index], index, FALSE);
3538 plug_udp_sink (stream, priv->mcast_udpsink[index],
3539 &priv->mcast_udpqueue[index], index, TRUE);
3542 plug_tcp_sink (stream, index);
3545 /* must be called with lock */
3547 create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
3549 GstRTSPStreamPrivate *priv;
3552 gboolean is_tcp, is_udp, is_mcast;
3556 GST_DEBUG_OBJECT (stream, "create sender part");
3557 priv = stream->priv;
3558 bin = priv->joined_bin;
3560 is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3561 is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3562 is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3565 mcast_ttl = transport->ttl;
3567 GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d (ttl: %d)", is_tcp,
3568 is_udp, is_mcast, mcast_ttl);
3570 if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) {
3571 GST_WARNING_OBJECT (stream, "no sockets assigned for UDP");
3575 if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) {
3576 GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast");
3580 if (g_object_class_find_property (G_OBJECT_GET_CLASS (priv->payloader),
3581 "onvif-no-rate-control"))
3582 g_object_set (priv->payloader, "onvif-no-rate-control",
3583 !priv->do_rate_control, NULL);
3585 for (i = 0; i < (priv->enable_rtcp ? 2 : 1); i++) {
3586 gboolean link_tee = FALSE;
3587 /* For the sender we create this bit of pipeline for both
3588 * RTP and RTCP (when enabled).
3589 * Initially there will be only one active transport for
3590 * the stream, so the pipeline will look like this:
3592 * .--------. .-----. .---------.
3593 * | rtpbin | | tee | | sink |
3594 * | send->sink src->sink |
3595 * '--------' '-----' '---------'
3597 * For each new transport, the already existing branch will
3598 * be reconfigured by adding a queue element:
3600 * .--------. .-----. .---------. .---------.
3601 * | rtpbin | | tee | | queue | | udpsink |
3602 * | send->sink src->sink src->sink |
3603 * '--------' | | '---------' '---------'
3604 * | | .---------. .---------.
3605 * | | | queue | | udpsink |
3606 * | src->sink src->sink |
3607 * | | '---------' '---------'
3608 * | | .---------. .---------.
3609 * | | | queue | | appsink |
3610 * | src->sink src->sink |
3611 * '-----' '---------' '---------'
3614 /* Only link the RTP send src if we're going to send RTP, link
3615 * the RTCP send src always */
3616 if (!priv->srcpad && i == 0)
3619 if (!priv->tee[i]) {
3620 /* make tee for RTP/RTCP */
3621 priv->tee[i] = gst_element_factory_make ("tee", NULL);
3622 gst_bin_add (bin, priv->tee[i]);
3626 if (is_udp && !priv->udpsink[i]) {
3627 /* we create only one pair of udpsinks for IPv4 and IPv6 */
3628 create_and_configure_udpsink (stream, &priv->udpsink[i],
3629 priv->socket_v4[i], priv->socket_v6[i], FALSE, (i == 0), mcast_ttl);
3630 plug_sink (stream, transport, i);
3631 } else if (is_mcast && !priv->mcast_udpsink[i]) {
3632 /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */
3633 create_and_configure_udpsink (stream, &priv->mcast_udpsink[i],
3634 priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0),
3636 plug_sink (stream, transport, i);
3637 } else if (is_tcp && !priv->appsink[i]) {
3639 priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
3640 g_object_set (priv->appsink[i], "emit-signals", FALSE, "buffer-list",
3641 TRUE, "max-buffers", 1, NULL);
3644 g_object_set (priv->appsink[i], "sync", priv->do_rate_control, NULL);
3646 /* we need to set sync and preroll to FALSE for the sink to avoid
3647 * deadlock. This is only needed for sink sending RTCP data. */
3649 g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
3651 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
3652 &sink_cb, stream, NULL);
3653 plug_sink (stream, transport, i);
3657 /* and link to rtpbin send pad */
3658 gst_element_sync_state_with_parent (priv->tee[i]);
3659 pad = gst_element_get_static_pad (priv->tee[i], "sink");
3660 gst_pad_link (priv->send_src[i], pad);
3661 gst_object_unref (pad);
3668 /* must be called with lock */
3670 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
3671 GstElement * funnel)
3673 GstRTSPStreamPrivate *priv;
3674 GstPad *pad, *selpad;
3677 priv = stream->priv;
3680 gst_bin_add (bin, src);
3682 pad = gst_element_get_static_pad (src, "src");
3684 /* block pad so src can't push data while it's not yet linked */
3685 id = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK |
3686 GST_PAD_PROBE_TYPE_BUFFER, NULL, NULL, NULL);
3687 /* we set and keep these to playing so that they don't cause NO_PREROLL return
3688 * values. This is only relevant for PLAY pipelines */
3689 gst_element_set_state (src, GST_STATE_PLAYING);
3690 gst_element_set_locked_state (src, TRUE);
3693 /* and link to the funnel */
3694 selpad = gst_element_request_pad_simple (funnel, "sink_%u");
3695 gst_pad_link (pad, selpad);
3697 gst_pad_remove_probe (pad, id);
3698 gst_object_unref (pad);
3699 gst_object_unref (selpad);
3702 /* must be called with lock */
3704 create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport *
3707 gboolean ret = FALSE;
3708 GstRTSPStreamPrivate *priv;
3719 GST_DEBUG_OBJECT (stream, "create receiver part");
3720 priv = stream->priv;
3721 bin = priv->joined_bin;
3723 tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3724 udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3725 mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3726 secure = (priv->profiles & GST_RTSP_PROFILE_SAVP)
3727 || (priv->profiles & GST_RTSP_PROFILE_SAVPF);
3730 rtp_caps = gst_caps_new_empty_simple ("application/x-srtp");
3731 rtcp_caps = gst_caps_new_empty_simple ("application/x-srtcp");
3733 rtp_caps = gst_caps_new_empty_simple ("application/x-rtp");
3734 rtcp_caps = gst_caps_new_empty_simple ("application/x-rtcp");
3737 GST_DEBUG_OBJECT (stream,
3738 "RTP caps: %" GST_PTR_FORMAT " RTCP caps: %" GST_PTR_FORMAT, rtp_caps,
3741 for (i = 0; i < (priv->enable_rtcp ? 2 : 1); i++) {
3742 /* For the receiver we create this bit of pipeline for both
3743 * RTP and RTCP (when enabled). We receive RTP/RTCP on appsrc and udpsrc
3744 * and it is all funneled into the rtpbin receive pad.
3747 * .--------. .--------. .--------.
3748 * | udpsrc | | funnel | | rtpbin |
3749 * | RTP src->sink src->sink |
3750 * '--------' | | | |
3751 * .--------. | | | |
3752 * | appsrc | | | | |
3753 * | RTP src->sink | | |
3754 * '--------' '--------' | |
3756 * .--------. .--------. | |
3757 * | udpsrc | | funnel | | |
3758 * | RTCP src->sink src->sink |
3759 * '--------' | | '--------'
3762 * | RTCP src->sink |
3763 * '--------' '--------'
3766 if (!priv->sinkpad && i == 0) {
3767 /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
3768 * RTCP sink always */
3772 /* make funnel for the RTP/RTCP receivers */
3773 if (!priv->funnel[i]) {
3774 priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
3775 gst_bin_add (bin, priv->funnel[i]);
3777 pad = gst_element_get_static_pad (priv->funnel[i], "src");
3778 gst_pad_link (pad, priv->recv_sink[i]);
3779 gst_object_unref (pad);
3782 if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) {
3783 GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources");
3784 if (!create_and_configure_udpsource (&priv->udpsrc_v4[i],
3785 priv->socket_v4[i]))
3789 g_object_set (priv->udpsrc_v4[i], "caps", rtp_caps, NULL);
3791 g_object_set (priv->udpsrc_v4[i], "caps", rtcp_caps, NULL);
3793 /* block early rtcp packets, pipeline not ready */
3794 g_assert (priv->block_early_rtcp_pad == NULL);
3795 priv->block_early_rtcp_pad = gst_element_get_static_pad
3796 (priv->udpsrc_v4[i], "src");
3797 priv->block_early_rtcp_probe = gst_pad_add_probe
3798 (priv->block_early_rtcp_pad,
3799 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER, NULL, NULL,
3803 plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
3806 if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) {
3807 GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources");
3808 if (!create_and_configure_udpsource (&priv->udpsrc_v6[i],
3809 priv->socket_v6[i]))
3813 g_object_set (priv->udpsrc_v6[i], "caps", rtp_caps, NULL);
3815 g_object_set (priv->udpsrc_v6[i], "caps", rtcp_caps, NULL);
3817 /* block early rtcp packets, pipeline not ready */
3818 g_assert (priv->block_early_rtcp_pad_ipv6 == NULL);
3819 priv->block_early_rtcp_pad_ipv6 = gst_element_get_static_pad
3820 (priv->udpsrc_v6[i], "src");
3821 priv->block_early_rtcp_probe_ipv6 = gst_pad_add_probe
3822 (priv->block_early_rtcp_pad_ipv6,
3823 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER, NULL, NULL,
3827 plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
3830 if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) {
3831 GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources");
3832 if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i],
3833 priv->mcast_socket_v4[i]))
3837 g_object_set (priv->mcast_udpsrc_v4[i], "caps", rtp_caps, NULL);
3839 g_object_set (priv->mcast_udpsrc_v4[i], "caps", rtcp_caps, NULL);
3842 plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
3845 if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) {
3846 GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources");
3847 if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i],
3848 priv->mcast_socket_v6[i]))
3852 g_object_set (priv->mcast_udpsrc_v6[i], "caps", rtp_caps, NULL);
3854 g_object_set (priv->mcast_udpsrc_v6[i], "caps", rtcp_caps, NULL);
3857 plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
3860 if (tcp && !priv->appsrc[i]) {
3861 /* make and add appsrc */
3862 priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
3863 priv->appsrc_base_time[i] = -1;
3864 g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
3866 plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
3869 gst_element_sync_state_with_parent (priv->funnel[i]);
3875 gst_caps_unref (rtp_caps);
3876 gst_caps_unref (rtcp_caps);
3881 gst_rtsp_stream_is_tcp_receiver (GstRTSPStream * stream)
3883 GstRTSPStreamPrivate *priv;
3884 gboolean ret = FALSE;
3886 priv = stream->priv;
3887 g_mutex_lock (&priv->lock);
3888 ret = (priv->sinkpad != NULL && priv->appsrc[0] != NULL);
3889 g_mutex_unlock (&priv->lock);
3895 check_mcast_client_addr (GstRTSPStream * stream, const GstRTSPTransport * tr)
3897 GstRTSPStreamPrivate *priv = stream->priv;
3900 if (priv->mcast_clients == NULL)
3906 if (tr->destination == NULL)
3907 goto no_destination;
3909 for (walk = priv->mcast_clients; walk; walk = g_list_next (walk)) {
3910 UdpClientAddrInfo *cli = walk->data;
3912 if ((g_strcmp0 (cli->address, tr->destination) == 0) &&
3913 (cli->rtp_port == tr->port.min))
3921 GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
3922 "has been reserved");
3927 GST_WARNING_OBJECT (stream, "Adding mcast transport, but no transport "
3928 "has been provided");
3933 GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
3934 "the reserved address");
3940 * gst_rtsp_stream_join_bin:
3941 * @stream: a #GstRTSPStream
3942 * @bin: (transfer none): a #GstBin to join
3943 * @rtpbin: (transfer none): a rtpbin element in @bin
3944 * @state: the target state of the new elements
3946 * Join the #GstBin @bin that contains the element @rtpbin.
3948 * @stream will link to @rtpbin, which must be inside @bin. The elements
3949 * added to @bin will be set to the state given in @state.
3951 * Returns: %TRUE on success.
3954 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
3955 GstElement * rtpbin, GstState state)
3957 GstRTSPStreamPrivate *priv;
3960 GstPadLinkReturn ret;
3962 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3963 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3964 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3966 priv = stream->priv;
3968 g_mutex_lock (&priv->lock);
3969 if (priv->joined_bin != NULL)
3972 /* create a session with the same index as the stream */
3975 GST_INFO ("stream %p joining bin as session %u", stream, idx);
3977 if (priv->profiles & GST_RTSP_PROFILE_SAVP
3978 || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
3980 g_signal_connect (rtpbin, "request-rtp-encoder",
3981 (GCallback) request_rtp_encoder, stream);
3982 g_signal_connect (rtpbin, "request-rtcp-encoder",
3983 (GCallback) request_rtcp_encoder, stream);
3984 g_signal_connect (rtpbin, "request-rtp-decoder",
3985 (GCallback) request_rtp_rtcp_decoder, stream);
3986 g_signal_connect (rtpbin, "request-rtcp-decoder",
3987 (GCallback) request_rtp_rtcp_decoder, stream);
3990 if (priv->sinkpad) {
3991 g_signal_connect (rtpbin, "request-pt-map",
3992 (GCallback) request_pt_map, stream);
3995 /* get pads from the RTP session element for sending and receiving
3998 /* get a pad for sending RTP */
3999 name = g_strdup_printf ("send_rtp_sink_%u", idx);
4000 priv->send_rtp_sink = gst_element_request_pad_simple (rtpbin, name);
4003 /* link the RTP pad to the session manager, it should not really fail unless
4004 * this is not really an RTP pad */
4005 ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
4006 if (ret != GST_PAD_LINK_OK)
4009 name = g_strdup_printf ("send_rtp_src_%u", idx);
4010 priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
4013 /* RECORD case: need to connect our sinkpad from here */
4014 g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
4016 g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
4018 name = g_strdup_printf ("recv_rtp_sink_%u", idx);
4019 priv->recv_sink[0] = gst_element_request_pad_simple (rtpbin, name);
4023 if (priv->enable_rtcp) {
4024 name = g_strdup_printf ("send_rtcp_src_%u", idx);
4025 priv->send_src[1] = gst_element_request_pad_simple (rtpbin, name);
4028 name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
4029 priv->recv_sink[1] = gst_element_request_pad_simple (rtpbin, name);
4033 /* get the session */
4034 g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
4036 g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
4038 g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
4040 g_signal_connect (priv->session, "on-ssrc-active",
4041 (GCallback) on_ssrc_active, stream);
4042 g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
4044 g_signal_connect (priv->session, "on-bye-timeout",
4045 (GCallback) on_bye_timeout, stream);
4046 g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
4049 /* signal for sender ssrc */
4050 g_signal_connect (priv->session, "on-new-sender-ssrc",
4051 (GCallback) on_new_sender_ssrc, stream);
4052 g_signal_connect (priv->session, "on-sender-ssrc-active",
4053 (GCallback) on_sender_ssrc_active, stream);
4055 g_object_set (priv->session, "disable-sr-timestamp", !priv->do_rate_control,
4059 /* be notified of caps changes */
4060 priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
4061 (GCallback) caps_notify, stream);
4062 priv->caps = gst_pad_get_current_caps (priv->send_src[0]);
4065 priv->joined_bin = bin;
4066 GST_DEBUG_OBJECT (stream, "successfully joined bin");
4067 g_mutex_unlock (&priv->lock);
4074 g_mutex_unlock (&priv->lock);
4079 GST_WARNING ("failed to link stream %u", idx);
4080 gst_object_unref (priv->send_rtp_sink);
4081 priv->send_rtp_sink = NULL;
4082 g_mutex_unlock (&priv->lock);
4088 clear_element (GstBin * bin, GstElement ** elementptr)
4091 gst_element_set_locked_state (*elementptr, FALSE);
4092 gst_element_set_state (*elementptr, GST_STATE_NULL);
4093 if (GST_ELEMENT_PARENT (*elementptr))
4094 gst_bin_remove (bin, *elementptr);
4096 gst_object_unref (*elementptr);
4102 * gst_rtsp_stream_leave_bin:
4103 * @stream: a #GstRTSPStream
4104 * @bin: (transfer none): a #GstBin
4105 * @rtpbin: (transfer none): a rtpbin #GstElement
4107 * Remove the elements of @stream from @bin.
4109 * Return: %TRUE on success.
4112 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
4113 GstElement * rtpbin)
4115 GstRTSPStreamPrivate *priv;
4118 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4119 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
4120 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
4122 priv = stream->priv;
4124 g_mutex_lock (&priv->send_lock);
4125 priv->continue_sending = FALSE;
4126 priv->send_cookie++;
4127 g_cond_signal (&priv->send_cond);
4128 g_mutex_unlock (&priv->send_lock);
4130 if (priv->send_thread) {
4131 g_thread_join (priv->send_thread);
4134 g_mutex_lock (&priv->lock);
4135 if (priv->joined_bin == NULL)
4136 goto was_not_joined;
4137 if (priv->joined_bin != bin)
4140 priv->joined_bin = NULL;
4142 /* all transports must be removed by now */
4143 if (priv->transports != NULL)
4144 goto transports_not_removed;
4146 if (priv->send_pool) {
4149 slask = priv->send_pool;
4150 priv->send_pool = NULL;
4151 g_mutex_unlock (&priv->lock);
4152 g_thread_pool_free (slask, TRUE, TRUE);
4153 g_mutex_lock (&priv->lock);
4156 clear_tr_cache (priv);
4158 GST_INFO ("stream %p leaving bin", stream);
4161 gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
4163 g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
4164 gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
4165 gst_object_unref (priv->send_rtp_sink);
4166 priv->send_rtp_sink = NULL;
4167 } else if (priv->recv_rtp_src) {
4168 gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
4169 gst_object_unref (priv->recv_rtp_src);
4170 priv->recv_rtp_src = NULL;
4173 for (i = 0; i < (priv->enable_rtcp ? 2 : 1); i++) {
4174 clear_element (bin, &priv->udpsrc_v4[i]);
4175 clear_element (bin, &priv->udpsrc_v6[i]);
4176 clear_element (bin, &priv->udpqueue[i]);
4177 clear_element (bin, &priv->udpsink[i]);
4179 clear_element (bin, &priv->mcast_udpsrc_v4[i]);
4180 clear_element (bin, &priv->mcast_udpsrc_v6[i]);
4181 clear_element (bin, &priv->mcast_udpqueue[i]);
4182 clear_element (bin, &priv->mcast_udpsink[i]);
4184 clear_element (bin, &priv->appsrc[i]);
4185 clear_element (bin, &priv->appqueue[i]);
4186 clear_element (bin, &priv->appsink[i]);
4188 clear_element (bin, &priv->tee[i]);
4189 clear_element (bin, &priv->funnel[i]);
4191 if (priv->sinkpad || i == 1) {
4192 gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
4193 gst_object_unref (priv->recv_sink[i]);
4194 priv->recv_sink[i] = NULL;
4199 gst_object_unref (priv->send_src[0]);
4200 priv->send_src[0] = NULL;
4203 if (priv->enable_rtcp) {
4204 gst_element_release_request_pad (rtpbin, priv->send_src[1]);
4205 gst_object_unref (priv->send_src[1]);
4206 priv->send_src[1] = NULL;
4209 g_object_unref (priv->session);
4210 priv->session = NULL;
4212 gst_caps_unref (priv->caps);
4216 gst_object_unref (priv->srtpenc);
4218 gst_object_unref (priv->srtpdec);
4220 if (priv->mcast_addr_v4)
4221 gst_rtsp_address_free (priv->mcast_addr_v4);
4222 priv->mcast_addr_v4 = NULL;
4223 if (priv->mcast_addr_v6)
4224 gst_rtsp_address_free (priv->mcast_addr_v6);
4225 priv->mcast_addr_v6 = NULL;
4226 if (priv->server_addr_v4)
4227 gst_rtsp_address_free (priv->server_addr_v4);
4228 priv->server_addr_v4 = NULL;
4229 if (priv->server_addr_v6)
4230 gst_rtsp_address_free (priv->server_addr_v6);
4231 priv->server_addr_v6 = NULL;
4233 g_mutex_unlock (&priv->lock);
4239 g_mutex_unlock (&priv->lock);
4242 transports_not_removed:
4244 GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
4245 g_mutex_unlock (&priv->lock);
4250 GST_ERROR_OBJECT (stream, "leaving the wrong bin");
4251 g_mutex_unlock (&priv->lock);
4257 * gst_rtsp_stream_get_joined_bin:
4258 * @stream: a #GstRTSPStream
4260 * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
4262 * Return: (transfer full) (nullable): the joined bin or NULL.
4265 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
4267 GstRTSPStreamPrivate *priv;
4270 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4272 priv = stream->priv;
4274 g_mutex_lock (&priv->lock);
4275 bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
4276 g_mutex_unlock (&priv->lock);
4282 * gst_rtsp_stream_get_rtpinfo:
4283 * @stream: a #GstRTSPStream
4284 * @rtptime: (allow-none) (out caller-allocates): result RTP timestamp
4285 * @seq: (allow-none) (out caller-allocates): result RTP seqnum
4286 * @clock_rate: (allow-none) (out caller-allocates): the clock rate
4287 * @running_time: (out caller-allocates): result running-time
4289 * Retrieve the current rtptime, seq and running-time. This is used to
4290 * construct a RTPInfo reply header.
4292 * Returns: %TRUE when rtptime, seq and running-time could be determined.
4295 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
4296 guint * rtptime, guint * seq, guint * clock_rate,
4297 GstClockTime * running_time)
4299 GstRTSPStreamPrivate *priv;
4300 GstStructure *stats;
4301 GObjectClass *payobjclass;
4303 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4305 priv = stream->priv;
4307 payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
4309 g_mutex_lock (&priv->lock);
4311 /* First try to extract the information from the last buffer on the sinks.
4312 * This will have a more accurate sequence number and timestamp, as between
4313 * the payloader and the sink there can be some queues
4315 if (priv->udpsink[0] || priv->mcast_udpsink[0] || priv->appsink[0]) {
4316 GstSample *last_sample;
4318 if (priv->udpsink[0])
4319 g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
4320 else if (priv->mcast_udpsink[0])
4321 g_object_get (priv->mcast_udpsink[0], "last-sample", &last_sample, NULL);
4323 g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
4328 GstSegment *segment;
4330 GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
4332 caps = gst_sample_get_caps (last_sample);
4333 buffer = gst_sample_get_buffer (last_sample);
4334 segment = gst_sample_get_segment (last_sample);
4335 s = gst_caps_get_structure (caps, 0);
4337 if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
4338 guint ssrc_buf = gst_rtp_buffer_get_ssrc (&rtp_buffer);
4339 guint ssrc_stream = 0;
4340 if (gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) &&
4341 gst_structure_get_uint (s, "ssrc", &ssrc_stream) &&
4342 ssrc_buf != ssrc_stream) {
4343 /* Skip buffers from auxiliary streams. */
4344 GST_DEBUG_OBJECT (stream,
4345 "not a buffer from the payloader, SSRC: %08x", ssrc_buf);
4347 gst_rtp_buffer_unmap (&rtp_buffer);
4348 gst_sample_unref (last_sample);
4353 *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
4357 *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
4360 gst_rtp_buffer_unmap (&rtp_buffer);
4364 gst_segment_to_running_time (segment, GST_FORMAT_TIME,
4365 GST_BUFFER_TIMESTAMP (buffer));
4369 gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
4371 if (*clock_rate == 0 && running_time)
4372 *running_time = GST_CLOCK_TIME_NONE;
4374 gst_sample_unref (last_sample);
4378 gst_sample_unref (last_sample);
4380 } else if (priv->blocking) {
4382 if (!priv->blocked_buffer)
4384 *seq = priv->blocked_seqnum;
4388 if (!priv->blocked_buffer)
4390 *rtptime = priv->blocked_rtptime;
4394 if (!GST_CLOCK_TIME_IS_VALID (priv->blocked_running_time))
4396 *running_time = priv->blocked_running_time;
4400 *clock_rate = priv->blocked_clock_rate;
4402 if (*clock_rate == 0 && running_time)
4403 *running_time = GST_CLOCK_TIME_NONE;
4411 if (g_object_class_find_property (payobjclass, "stats")) {
4412 g_object_get (priv->payloader, "stats", &stats, NULL);
4417 gst_structure_get_uint (stats, "seqnum-offset", seq);
4420 gst_structure_get_uint (stats, "timestamp", rtptime);
4423 gst_structure_get_clock_time (stats, "running-time", running_time);
4426 gst_structure_get_uint (stats, "clock-rate", clock_rate);
4427 if (*clock_rate == 0 && running_time)
4428 *running_time = GST_CLOCK_TIME_NONE;
4430 gst_structure_free (stats);
4432 if (!g_object_class_find_property (payobjclass, "seqnum") ||
4433 !g_object_class_find_property (payobjclass, "timestamp"))
4437 g_object_get (priv->payloader, "seqnum", seq, NULL);
4440 g_object_get (priv->payloader, "timestamp", rtptime, NULL);
4443 *running_time = GST_CLOCK_TIME_NONE;
4447 g_mutex_unlock (&priv->lock);
4454 GST_WARNING ("Could not get payloader stats");
4455 g_mutex_unlock (&priv->lock);
4461 * gst_rtsp_stream_get_rates:
4462 * @stream: a #GstRTSPStream
4463 * @rate: (optional) (out caller-allocates): the configured rate
4464 * @applied_rate: (optional) (out caller-allocates): the configured applied_rate
4466 * Retrieve the current rate and/or applied_rate.
4468 * Returns: %TRUE if rate and/or applied_rate could be determined.
4472 gst_rtsp_stream_get_rates (GstRTSPStream * stream, gdouble * rate,
4473 gdouble * applied_rate)
4475 GstRTSPStreamPrivate *priv;
4477 const GstSegment *segment;
4479 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4481 if (!rate && !applied_rate) {
4482 GST_WARNING_OBJECT (stream, "rate and applied_rate are both NULL");
4486 priv = stream->priv;
4488 g_mutex_lock (&priv->lock);
4490 if (!priv->send_rtp_sink)
4491 goto no_rtp_sink_pad;
4493 event = gst_pad_get_sticky_event (priv->send_rtp_sink, GST_EVENT_SEGMENT, 0);
4495 goto no_sticky_event;
4497 gst_event_parse_segment (event, &segment);
4499 *rate = segment->rate;
4501 *applied_rate = segment->applied_rate;
4503 gst_event_unref (event);
4504 g_mutex_unlock (&priv->lock);
4511 GST_WARNING_OBJECT (stream, "no send_rtp_sink pad yet");
4512 g_mutex_unlock (&priv->lock);
4517 GST_WARNING_OBJECT (stream, "no segment event on send_rtp_sink pad");
4518 g_mutex_unlock (&priv->lock);
4525 * gst_rtsp_stream_get_caps:
4526 * @stream: a #GstRTSPStream
4528 * Retrieve the current caps of @stream.
4530 * Returns: (transfer full) (nullable): the #GstCaps of @stream.
4531 * use gst_caps_unref() after usage.
4534 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
4536 GstRTSPStreamPrivate *priv;
4539 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4541 priv = stream->priv;
4543 g_mutex_lock (&priv->lock);
4544 if ((result = priv->caps))
4545 gst_caps_ref (result);
4546 g_mutex_unlock (&priv->lock);
4552 * gst_rtsp_stream_recv_rtp:
4553 * @stream: a #GstRTSPStream
4554 * @buffer: (transfer full): a #GstBuffer
4556 * Handle an RTP buffer for the stream. This method is usually called when a
4557 * message has been received from a client using the TCP transport.
4559 * This function takes ownership of @buffer.
4561 * Returns: a GstFlowReturn.
4564 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
4566 GstRTSPStreamPrivate *priv;
4568 GstElement *element;
4570 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
4571 priv = stream->priv;
4572 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
4573 g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4575 g_mutex_lock (&priv->lock);
4576 if (priv->appsrc[0])
4577 element = gst_object_ref (priv->appsrc[0]);
4580 g_mutex_unlock (&priv->lock);
4583 if (priv->appsrc_base_time[0] == -1) {
4584 /* Take current running_time. This timestamp will be put on
4585 * the first buffer of each stream because we are a live source and so we
4586 * timestamp with the running_time. When we are dealing with TCP, we also
4587 * only timestamp the first buffer (using the DISCONT flag) because a server
4588 * typically bursts data, for which we don't want to compensate by speeding
4589 * up the media. The other timestamps will be interpollated from this one
4590 * using the RTP timestamps. */
4591 GST_OBJECT_LOCK (element);
4592 if (GST_ELEMENT_CLOCK (element)) {
4594 GstClockTime base_time;
4596 now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
4597 base_time = GST_ELEMENT_CAST (element)->base_time;
4599 priv->appsrc_base_time[0] = now - base_time;
4600 GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
4601 GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
4602 ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
4603 GST_TIME_ARGS (base_time));
4605 GST_OBJECT_UNLOCK (element);
4608 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
4609 gst_object_unref (element);
4617 * gst_rtsp_stream_recv_rtcp:
4618 * @stream: a #GstRTSPStream
4619 * @buffer: (transfer full): a #GstBuffer
4621 * Handle an RTCP buffer for the stream. This method is usually called when a
4622 * message has been received from a client using the TCP transport.
4624 * This function takes ownership of @buffer.
4626 * Returns: a GstFlowReturn.
4629 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
4631 GstRTSPStreamPrivate *priv;
4633 GstElement *element;
4635 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
4636 priv = stream->priv;
4637 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
4639 if (priv->joined_bin == NULL) {
4640 gst_buffer_unref (buffer);
4641 return GST_FLOW_NOT_LINKED;
4643 g_mutex_lock (&priv->lock);
4644 if (priv->appsrc[1])
4645 element = gst_object_ref (priv->appsrc[1]);
4648 g_mutex_unlock (&priv->lock);
4651 if (priv->appsrc_base_time[1] == -1) {
4652 /* Take current running_time. This timestamp will be put on
4653 * the first buffer of each stream because we are a live source and so we
4654 * timestamp with the running_time. When we are dealing with TCP, we also
4655 * only timestamp the first buffer (using the DISCONT flag) because a server
4656 * typically bursts data, for which we don't want to compensate by speeding
4657 * up the media. The other timestamps will be interpollated from this one
4658 * using the RTP timestamps. */
4659 GST_OBJECT_LOCK (element);
4660 if (GST_ELEMENT_CLOCK (element)) {
4662 GstClockTime base_time;
4664 now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
4665 base_time = GST_ELEMENT_CAST (element)->base_time;
4667 priv->appsrc_base_time[1] = now - base_time;
4668 GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
4669 GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
4670 ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
4671 GST_TIME_ARGS (base_time));
4673 GST_OBJECT_UNLOCK (element);
4676 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
4677 gst_object_unref (element);
4680 gst_buffer_unref (buffer);
4685 /* must be called with lock */
4687 add_client (GstElement * rtp_sink, GstElement * rtcp_sink, const gchar * host,
4688 gint rtp_port, gint rtcp_port)
4690 if (rtp_sink != NULL)
4691 g_signal_emit_by_name (rtp_sink, "add", host, rtp_port, NULL);
4692 if (rtcp_sink != NULL)
4693 g_signal_emit_by_name (rtcp_sink, "add", host, rtcp_port, NULL);
4696 /* must be called with lock */
4698 remove_client (GstElement * rtp_sink, GstElement * rtcp_sink,
4699 const gchar * host, gint rtp_port, gint rtcp_port)
4701 if (rtp_sink != NULL)
4702 g_signal_emit_by_name (rtp_sink, "remove", host, rtp_port, NULL);
4703 if (rtcp_sink != NULL)
4704 g_signal_emit_by_name (rtcp_sink, "remove", host, rtcp_port, NULL);
4707 /* must be called with lock */
4709 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
4712 GstRTSPStreamPrivate *priv = stream->priv;
4713 const GstRTSPTransport *tr;
4718 tr = gst_rtsp_stream_transport_get_transport (trans);
4719 dest = tr->destination;
4721 tr_element = g_list_find (priv->transports, trans);
4723 if (add && tr_element)
4725 else if (!add && !tr_element)
4728 switch (tr->lower_transport) {
4729 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
4735 GST_INFO ("adding %s:%d-%d", dest, min, max);
4736 if (!check_mcast_client_addr (stream, tr))
4738 add_client (priv->mcast_udpsink[0], priv->mcast_udpsink[1], dest, min,
4742 GST_INFO ("setting ttl-mc %d", tr->ttl);
4743 if (priv->mcast_udpsink[0])
4744 g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "ttl-mc", tr->ttl,
4746 if (priv->mcast_udpsink[1])
4747 g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "ttl-mc", tr->ttl,
4750 priv->transports = g_list_prepend (priv->transports, trans);
4752 GST_INFO ("removing %s:%d-%d", dest, min, max);
4753 if (!remove_mcast_client_addr (stream, dest, min, max))
4754 GST_WARNING_OBJECT (stream,
4755 "Failed to remove multicast address: %s:%d-%d", dest, min, max);
4756 priv->transports = g_list_delete_link (priv->transports, tr_element);
4757 remove_client (priv->mcast_udpsink[0], priv->mcast_udpsink[1], dest,
4762 case GST_RTSP_LOWER_TRANS_UDP:
4764 if (priv->client_side) {
4765 /* In client side mode the 'destination' is the RTSP server, so send
4767 min = tr->server_port.min;
4768 max = tr->server_port.max;
4770 min = tr->client_port.min;
4771 max = tr->client_port.max;
4775 GST_INFO ("adding %s:%d-%d", dest, min, max);
4776 add_client (priv->udpsink[0], priv->udpsink[1], dest, min, max);
4777 priv->transports = g_list_prepend (priv->transports, trans);
4779 GST_INFO ("removing %s:%d-%d", dest, min, max);
4780 priv->transports = g_list_delete_link (priv->transports, tr_element);
4781 remove_client (priv->udpsink[0], priv->udpsink[1], dest, min, max);
4783 priv->transports_cookie++;
4786 case GST_RTSP_LOWER_TRANS_TCP:
4788 GST_INFO ("adding TCP %s", tr->destination);
4789 priv->transports = g_list_prepend (priv->transports, trans);
4790 priv->n_tcp_transports++;
4792 GST_INFO ("removing TCP %s", tr->destination);
4793 priv->transports = g_list_delete_link (priv->transports, tr_element);
4795 gst_rtsp_stream_transport_lock_backlog (trans);
4796 gst_rtsp_stream_transport_clear_backlog (trans);
4797 gst_rtsp_stream_transport_unlock_backlog (trans);
4799 priv->n_tcp_transports--;
4801 priv->transports_cookie++;
4804 goto unknown_transport;
4811 GST_INFO ("Unknown transport %d", tr->lower_transport);
4821 on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data)
4823 GstRTSPStream *stream = GST_RTSP_STREAM (user_data);
4824 GstRTSPStreamPrivate *priv = stream->priv;
4826 GST_DEBUG_OBJECT (stream, "message send complete");
4828 check_transport_backlog (stream, trans);
4830 g_mutex_lock (&priv->send_lock);
4831 priv->send_cookie++;
4832 g_cond_signal (&priv->send_cond);
4833 g_mutex_unlock (&priv->send_lock);
4837 * gst_rtsp_stream_add_transport:
4838 * @stream: a #GstRTSPStream
4839 * @trans: (transfer none): a #GstRTSPStreamTransport
4841 * Add the transport in @trans to @stream. The media of @stream will
4842 * then also be send to the values configured in @trans. Adding the
4843 * same transport twice will not add it a second time.
4845 * @stream must be joined to a bin.
4847 * @trans must contain a valid #GstRTSPTransport.
4849 * Returns: %TRUE if @trans was added
4852 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
4853 GstRTSPStreamTransport * trans)
4855 GstRTSPStreamPrivate *priv;
4858 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4859 priv = stream->priv;
4860 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
4861 g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4863 g_mutex_lock (&priv->lock);
4864 res = update_transport (stream, trans, TRUE);
4866 gst_rtsp_stream_transport_set_message_sent_full (trans, on_message_sent,
4868 g_mutex_unlock (&priv->lock);
4874 * gst_rtsp_stream_remove_transport:
4875 * @stream: a #GstRTSPStream
4876 * @trans: (transfer none): a #GstRTSPStreamTransport
4878 * Remove the transport in @trans from @stream. The media of @stream will
4879 * not be sent to the values configured in @trans.
4881 * @stream must be joined to a bin.
4883 * @trans must contain a valid #GstRTSPTransport.
4885 * Returns: %TRUE if @trans was removed
4888 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
4889 GstRTSPStreamTransport * trans)
4891 GstRTSPStreamPrivate *priv;
4894 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4895 priv = stream->priv;
4896 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
4897 g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4899 g_mutex_lock (&priv->lock);
4900 res = update_transport (stream, trans, FALSE);
4901 g_mutex_unlock (&priv->lock);
4907 * gst_rtsp_stream_update_crypto:
4908 * @stream: a #GstRTSPStream
4910 * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
4912 * Update the new crypto information for @ssrc in @stream. If information
4913 * for @ssrc did not exist, it will be added. If information
4914 * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
4915 * be removed from @stream.
4917 * Returns: %TRUE if @crypto could be updated
4920 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
4921 guint ssrc, GstCaps * crypto)
4923 GstRTSPStreamPrivate *priv;
4925 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4926 g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
4928 priv = stream->priv;
4930 GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
4932 g_mutex_lock (&priv->lock);
4934 g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
4935 gst_caps_ref (crypto));
4937 g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
4938 g_mutex_unlock (&priv->lock);
4944 * gst_rtsp_stream_get_rtp_socket:
4945 * @stream: a #GstRTSPStream
4946 * @family: the socket family
4948 * Get the RTP socket from @stream for a @family.
4950 * @stream must be joined to a bin.
4952 * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
4953 * socket could be allocated for @family. Unref after usage
4956 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
4958 GstRTSPStreamPrivate *priv = stream->priv;
4961 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4962 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4963 family == G_SOCKET_FAMILY_IPV6, NULL);
4965 g_mutex_lock (&priv->lock);
4966 if (family == G_SOCKET_FAMILY_IPV6)
4967 socket = priv->socket_v6[0];
4969 socket = priv->socket_v4[0];
4972 socket = g_object_ref (socket);
4973 g_mutex_unlock (&priv->lock);
4979 * gst_rtsp_stream_get_rtcp_socket:
4980 * @stream: a #GstRTSPStream
4981 * @family: the socket family
4983 * Get the RTCP socket from @stream for a @family.
4985 * @stream must be joined to a bin.
4987 * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
4988 * socket could be allocated for @family. Unref after usage
4991 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
4993 GstRTSPStreamPrivate *priv = stream->priv;
4996 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4997 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4998 family == G_SOCKET_FAMILY_IPV6, NULL);
5000 g_mutex_lock (&priv->lock);
5001 if (family == G_SOCKET_FAMILY_IPV6)
5002 socket = priv->socket_v6[1];
5004 socket = priv->socket_v4[1];
5007 socket = g_object_ref (socket);
5008 g_mutex_unlock (&priv->lock);
5014 * gst_rtsp_stream_get_rtp_multicast_socket:
5015 * @stream: a #GstRTSPStream
5016 * @family: the socket family
5018 * Get the multicast RTP socket from @stream for a @family.
5020 * Returns: (transfer full) (nullable): the multicast RTP socket or %NULL if no
5022 * socket could be allocated for @family. Unref after usage
5025 gst_rtsp_stream_get_rtp_multicast_socket (GstRTSPStream * stream,
5026 GSocketFamily family)
5028 GstRTSPStreamPrivate *priv = stream->priv;
5031 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5032 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
5033 family == G_SOCKET_FAMILY_IPV6, NULL);
5035 g_mutex_lock (&priv->lock);
5036 if (family == G_SOCKET_FAMILY_IPV6)
5037 socket = priv->mcast_socket_v6[0];
5039 socket = priv->mcast_socket_v4[0];
5042 socket = g_object_ref (socket);
5043 g_mutex_unlock (&priv->lock);
5049 * gst_rtsp_stream_get_rtcp_multicast_socket:
5050 * @stream: a #GstRTSPStream
5051 * @family: the socket family
5053 * Get the multicast RTCP socket from @stream for a @family.
5055 * Returns: (transfer full) (nullable): the multicast RTCP socket or %NULL if no
5056 * socket could be allocated for @family. Unref after usage
5061 gst_rtsp_stream_get_rtcp_multicast_socket (GstRTSPStream * stream,
5062 GSocketFamily family)
5064 GstRTSPStreamPrivate *priv = stream->priv;
5067 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5068 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
5069 family == G_SOCKET_FAMILY_IPV6, NULL);
5071 g_mutex_lock (&priv->lock);
5072 if (family == G_SOCKET_FAMILY_IPV6)
5073 socket = priv->mcast_socket_v6[1];
5075 socket = priv->mcast_socket_v4[1];
5078 socket = g_object_ref (socket);
5079 g_mutex_unlock (&priv->lock);
5085 * gst_rtsp_stream_add_multicast_client_address:
5086 * @stream: a #GstRTSPStream
5087 * @destination: (transfer none): a multicast address to add
5088 * @rtp_port: RTP port
5089 * @rtcp_port: RTCP port
5090 * @family: socket family
5092 * Add multicast client address to stream. At this point, the sockets that
5093 * will stream RTP and RTCP data to @destination are supposed to be
5096 * Returns: %TRUE if @destination can be addedd and handled by @stream.
5101 gst_rtsp_stream_add_multicast_client_address (GstRTSPStream * stream,
5102 const gchar * destination, guint rtp_port, guint rtcp_port,
5103 GSocketFamily family)
5105 GstRTSPStreamPrivate *priv;
5107 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5108 g_return_val_if_fail (destination != NULL, FALSE);
5110 priv = stream->priv;
5111 g_mutex_lock (&priv->lock);
5112 if ((family == G_SOCKET_FAMILY_IPV4) && (priv->mcast_socket_v4[0] == NULL))
5114 else if ((family == G_SOCKET_FAMILY_IPV6) &&
5115 (priv->mcast_socket_v6[0] == NULL))
5118 if (!add_mcast_client_addr (stream, destination, rtp_port, rtcp_port))
5119 goto add_addr_error;
5120 g_mutex_unlock (&priv->lock);
5126 GST_WARNING_OBJECT (stream,
5127 "Failed to add multicast address: no udp socket");
5128 g_mutex_unlock (&priv->lock);
5133 GST_WARNING_OBJECT (stream,
5134 "Failed to add multicast address: invalid address");
5135 g_mutex_unlock (&priv->lock);
5141 * gst_rtsp_stream_get_multicast_client_addresses
5142 * @stream: a #GstRTSPStream
5144 * Get all multicast client addresses that RTP data will be sent to
5146 * Returns: A comma separated list of host:port pairs with destinations
5151 gst_rtsp_stream_get_multicast_client_addresses (GstRTSPStream * stream)
5153 GstRTSPStreamPrivate *priv;
5157 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5159 priv = stream->priv;
5160 str = g_string_new ("");
5162 g_mutex_lock (&priv->lock);
5163 clients = priv->mcast_clients;
5164 while (clients != NULL) {
5165 UdpClientAddrInfo *client;
5167 client = (UdpClientAddrInfo *) clients->data;
5168 clients = g_list_next (clients);
5169 g_string_append_printf (str, "%s:%d%s", client->address, client->rtp_port,
5170 (clients != NULL ? "," : ""));
5172 g_mutex_unlock (&priv->lock);
5174 return g_string_free (str, FALSE);
5178 * gst_rtsp_stream_set_seqnum:
5179 * @stream: a #GstRTSPStream
5180 * @seqnum: a new sequence number
5182 * Configure the sequence number in the payloader of @stream to @seqnum.
5185 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
5187 GstRTSPStreamPrivate *priv;
5189 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5191 priv = stream->priv;
5193 g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
5197 * gst_rtsp_stream_get_seqnum:
5198 * @stream: a #GstRTSPStream
5200 * Get the configured sequence number in the payloader of @stream.
5202 * Returns: the sequence number of the payloader.
5205 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
5207 GstRTSPStreamPrivate *priv;
5210 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
5212 priv = stream->priv;
5214 g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
5220 gst_rtsp_stream_get_udp_sent_bytes (GstRTSPStream * stream)
5222 GstRTSPStreamPrivate *priv;
5225 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
5227 priv = stream->priv;
5229 g_object_get (G_OBJECT (priv->udpsink[0]), "bytes-to-serve", &bytes, NULL);
5235 * gst_rtsp_stream_transport_filter:
5236 * @stream: a #GstRTSPStream
5237 * @func: (scope call) (allow-none): a callback
5238 * @user_data: (closure): user data passed to @func
5240 * Call @func for each transport managed by @stream. The result value of @func
5241 * determines what happens to the transport. @func will be called with @stream
5242 * locked so no further actions on @stream can be performed from @func.
5244 * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
5247 * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
5249 * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
5250 * will also be added with an additional ref to the result #GList of this
5253 * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
5255 * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
5256 * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
5257 * element in the #GList should be unreffed before the list is freed.
5260 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
5261 GstRTSPStreamTransportFilterFunc func, gpointer user_data)
5263 GstRTSPStreamPrivate *priv;
5264 GList *result, *walk, *next;
5265 GHashTable *visited = NULL;
5268 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5270 priv = stream->priv;
5274 visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
5276 g_mutex_lock (&priv->lock);
5278 cookie = priv->transports_cookie;
5279 for (walk = priv->transports; walk; walk = next) {
5280 GstRTSPStreamTransport *trans = walk->data;
5281 GstRTSPFilterResult res;
5284 next = g_list_next (walk);
5287 /* only visit each transport once */
5288 if (g_hash_table_contains (visited, trans))
5291 g_hash_table_add (visited, g_object_ref (trans));
5292 g_mutex_unlock (&priv->lock);
5294 res = func (stream, trans, user_data);
5296 g_mutex_lock (&priv->lock);
5298 res = GST_RTSP_FILTER_REF;
5300 changed = (cookie != priv->transports_cookie);
5303 case GST_RTSP_FILTER_REMOVE:
5304 update_transport (stream, trans, FALSE);
5306 case GST_RTSP_FILTER_REF:
5307 result = g_list_prepend (result, g_object_ref (trans));
5309 case GST_RTSP_FILTER_KEEP:
5316 g_mutex_unlock (&priv->lock);
5319 g_hash_table_unref (visited);
5324 static GstPadProbeReturn
5325 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
5327 GstRTSPStreamPrivate *priv;
5328 GstRTSPStream *stream;
5329 GstBuffer *buffer = NULL;
5330 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
5334 priv = stream->priv;
5336 g_mutex_lock (&priv->lock);
5338 if ((info->type & GST_PAD_PROBE_TYPE_BUFFER)) {
5339 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
5341 buffer = gst_pad_probe_info_get_buffer (info);
5342 if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) {
5343 priv->blocked_buffer = TRUE;
5344 priv->blocked_seqnum = gst_rtp_buffer_get_seq (&rtp);
5345 priv->blocked_rtptime = gst_rtp_buffer_get_timestamp (&rtp);
5346 gst_rtp_buffer_unmap (&rtp);
5348 priv->position = GST_BUFFER_TIMESTAMP (buffer);
5349 } else if ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)) {
5350 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
5352 GstBufferList *list = gst_pad_probe_info_get_buffer_list (info);
5353 buffer = gst_buffer_list_get (list, 0);
5354 if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) {
5355 priv->blocked_buffer = TRUE;
5356 priv->blocked_seqnum = gst_rtp_buffer_get_seq (&rtp);
5357 priv->blocked_rtptime = gst_rtp_buffer_get_timestamp (&rtp);
5358 gst_rtp_buffer_unmap (&rtp);
5360 priv->position = GST_BUFFER_TIMESTAMP (buffer);
5361 } else if ((info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM)) {
5362 if (GST_EVENT_TYPE (info->data) == GST_EVENT_GAP) {
5363 gst_event_parse_gap (info->data, &priv->position, NULL);
5365 ret = GST_PAD_PROBE_PASS;
5366 g_mutex_unlock (&priv->lock);
5370 g_assert_not_reached ();
5373 event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
5375 const GstSegment *segment;
5377 gst_event_parse_segment (event, &segment);
5378 priv->blocked_running_time =
5379 gst_segment_to_stream_time (segment, GST_FORMAT_TIME, priv->position);
5380 gst_event_unref (event);
5383 event = gst_pad_get_sticky_event (pad, GST_EVENT_CAPS, 0);
5388 gst_event_parse_caps (event, &caps);
5389 s = gst_caps_get_structure (caps, 0);
5390 gst_structure_get_int (s, "clock-rate", &priv->blocked_clock_rate);
5391 gst_event_unref (event);
5394 priv->blocking = TRUE;
5396 GST_DEBUG_OBJECT (pad, "Now blocking");
5398 GST_DEBUG_OBJECT (stream, "position: %" GST_TIME_FORMAT,
5399 GST_TIME_ARGS (priv->position));
5401 g_mutex_unlock (&priv->lock);
5403 gst_element_post_message (priv->payloader,
5404 gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
5405 gst_structure_new ("GstRTSPStreamBlocking", "is_complete",
5406 G_TYPE_BOOLEAN, priv->is_complete, NULL)));
5413 set_blocked (GstRTSPStream * stream, gboolean blocked)
5415 GstRTSPStreamPrivate *priv;
5418 GST_DEBUG_OBJECT (stream, "blocked: %d", blocked);
5420 priv = stream->priv;
5424 if (priv->sinkpad) {
5425 priv->blocking = TRUE;
5428 for (i = 0; i < 2; i++) {
5429 if (priv->blocked_id[i] != 0)
5431 if (priv->send_src[i]) {
5432 priv->blocking = FALSE;
5433 priv->blocked_buffer = FALSE;
5434 priv->blocked_running_time = GST_CLOCK_TIME_NONE;
5435 priv->blocked_clock_rate = 0;
5436 priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
5437 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
5438 GST_PAD_PROBE_TYPE_BUFFER_LIST |
5439 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, pad_blocking,
5440 g_object_ref (stream), g_object_unref);
5444 for (i = 0; i < 2; i++) {
5445 if (priv->blocked_id[i] != 0) {
5446 gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]);
5447 priv->blocked_id[i] = 0;
5450 priv->blocking = FALSE;
5455 * gst_rtsp_stream_set_blocked:
5456 * @stream: a #GstRTSPStream
5457 * @blocked: boolean indicating we should block or unblock
5459 * Blocks or unblocks the dataflow on @stream.
5461 * Returns: %TRUE on success
5464 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
5466 GstRTSPStreamPrivate *priv;
5468 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5470 priv = stream->priv;
5471 g_mutex_lock (&priv->lock);
5472 set_blocked (stream, blocked);
5473 g_mutex_unlock (&priv->lock);
5479 * gst_rtsp_stream_ublock_linked:
5480 * @stream: a #GstRTSPStream
5482 * Unblocks the dataflow on @stream if it is linked.
5484 * Returns: %TRUE on success
5489 gst_rtsp_stream_unblock_linked (GstRTSPStream * stream)
5491 GstRTSPStreamPrivate *priv;
5493 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5495 priv = stream->priv;
5496 g_mutex_lock (&priv->lock);
5497 if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0]))
5498 set_blocked (stream, FALSE);
5499 g_mutex_unlock (&priv->lock);
5505 * gst_rtsp_stream_is_blocking:
5506 * @stream: a #GstRTSPStream
5508 * Check if @stream is blocking on a #GstBuffer.
5510 * Returns: %TRUE if @stream is blocking
5513 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
5515 GstRTSPStreamPrivate *priv;
5518 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5520 priv = stream->priv;
5522 g_mutex_lock (&priv->lock);
5523 result = priv->blocking;
5524 g_mutex_unlock (&priv->lock);
5530 * gst_rtsp_stream_query_position:
5531 * @stream: a #GstRTSPStream
5532 * @position: (out): current position of a #GstRTSPStream
5534 * Query the position of the stream in %GST_FORMAT_TIME. This only considers
5535 * the RTP parts of the pipeline and not the RTCP parts.
5537 * Returns: %TRUE if the position could be queried
5540 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
5542 GstRTSPStreamPrivate *priv;
5546 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5548 /* query position: if no sinks have been added yet,
5549 * we obtain the position from the pad otherwise we query the sinks */
5551 priv = stream->priv;
5553 g_mutex_lock (&priv->lock);
5555 if (priv->blocking && GST_CLOCK_TIME_IS_VALID (priv->blocked_running_time)) {
5556 *position = priv->blocked_running_time;
5557 g_mutex_unlock (&priv->lock);
5561 /* depending on the transport type, it should query corresponding sink */
5562 if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP)
5563 sink = priv->udpsink[0];
5564 else if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
5565 sink = priv->mcast_udpsink[0];
5567 sink = priv->appsink[0];
5570 gst_object_ref (sink);
5571 } else if (priv->send_src[0]) {
5572 pad = gst_object_ref (priv->send_src[0]);
5574 g_mutex_unlock (&priv->lock);
5575 GST_WARNING_OBJECT (stream, "Couldn't obtain postion: erroneous pipeline");
5578 g_mutex_unlock (&priv->lock);
5581 if (!gst_element_query_position (sink, GST_FORMAT_TIME, position)) {
5582 GST_WARNING_OBJECT (stream,
5583 "Couldn't obtain postion: position query failed");
5584 gst_object_unref (sink);
5587 gst_object_unref (sink);
5590 const GstSegment *segment;
5592 event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
5594 GST_WARNING_OBJECT (stream, "Couldn't obtain postion: no segment event");
5595 gst_object_unref (pad);
5599 gst_event_parse_segment (event, &segment);
5600 if (segment->format != GST_FORMAT_TIME) {
5603 g_mutex_lock (&priv->lock);
5604 *position = priv->position;
5605 g_mutex_unlock (&priv->lock);
5607 gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *position);
5609 gst_event_unref (event);
5610 gst_object_unref (pad);
5617 * gst_rtsp_stream_query_stop:
5618 * @stream: a #GstRTSPStream
5619 * @stop: (out): current stop of a #GstRTSPStream
5621 * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
5622 * the RTP parts of the pipeline and not the RTCP parts.
5624 * Returns: %TRUE if the stop could be queried
5627 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
5629 GstRTSPStreamPrivate *priv;
5633 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5635 /* query stop position: if no sinks have been added yet,
5636 * we obtain the stop position from the pad otherwise we query the sinks */
5638 priv = stream->priv;
5640 g_mutex_lock (&priv->lock);
5641 /* depending on the transport type, it should query corresponding sink */
5642 if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP)
5643 sink = priv->udpsink[0];
5644 else if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
5645 sink = priv->mcast_udpsink[0];
5647 sink = priv->appsink[0];
5650 gst_object_ref (sink);
5651 } else if (priv->send_src[0]) {
5652 pad = gst_object_ref (priv->send_src[0]);
5654 g_mutex_unlock (&priv->lock);
5655 GST_WARNING_OBJECT (stream, "Couldn't obtain stop: erroneous pipeline");
5658 g_mutex_unlock (&priv->lock);
5667 query = gst_query_new_segment (GST_FORMAT_TIME);
5668 if (!gst_element_query (sink, query)) {
5669 GST_WARNING_OBJECT (stream, "Couldn't obtain stop: element query failed");
5670 gst_query_unref (query);
5671 gst_object_unref (sink);
5674 gst_query_parse_segment (query, &rate, &format, &start_value, &stop_value);
5675 if (format != GST_FORMAT_TIME)
5678 *stop = rate > 0.0 ? stop_value : start_value;
5679 gst_query_unref (query);
5680 gst_object_unref (sink);
5683 const GstSegment *segment;
5685 event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
5687 GST_WARNING_OBJECT (stream, "Couldn't obtain stop: no segment event");
5688 gst_object_unref (pad);
5691 gst_event_parse_segment (event, &segment);
5692 if (segment->format != GST_FORMAT_TIME) {
5695 *stop = segment->stop;
5697 *stop = segment->duration;
5699 *stop = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *stop);
5701 gst_event_unref (event);
5702 gst_object_unref (pad);
5709 * gst_rtsp_stream_seekable:
5710 * @stream: a #GstRTSPStream
5712 * Checks whether the individual @stream is seekable.
5714 * Returns: %TRUE if @stream is seekable, else %FALSE.
5719 gst_rtsp_stream_seekable (GstRTSPStream * stream)
5721 GstRTSPStreamPrivate *priv;
5723 GstQuery *query = NULL;
5724 gboolean seekable = FALSE;
5726 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5728 /* query stop position: if no sinks have been added yet,
5729 * we obtain the stop position from the pad otherwise we query the sinks */
5731 priv = stream->priv;
5733 g_mutex_lock (&priv->lock);
5734 /* depending on the transport type, it should query corresponding sink */
5736 pad = gst_object_ref (priv->srcpad);
5738 g_mutex_unlock (&priv->lock);
5739 GST_WARNING_OBJECT (stream, "Pad not available, can't query seekability");
5742 g_mutex_unlock (&priv->lock);
5744 query = gst_query_new_seeking (GST_FORMAT_TIME);
5745 if (!gst_pad_query (pad, query)) {
5746 GST_WARNING_OBJECT (stream, "seeking query failed");
5749 gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
5753 gst_object_unref (pad);
5755 gst_query_unref (query);
5757 GST_DEBUG_OBJECT (stream, "Returning %d", seekable);
5763 * gst_rtsp_stream_complete_stream:
5764 * @stream: a #GstRTSPStream
5765 * @transport: a #GstRTSPTransport
5767 * Add a receiver and sender part to the pipeline based on the transport from
5770 * Returns: %TRUE if the stream has been sucessfully updated.
5775 gst_rtsp_stream_complete_stream (GstRTSPStream * stream,
5776 const GstRTSPTransport * transport)
5778 GstRTSPStreamPrivate *priv;
5780 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5782 priv = stream->priv;
5783 GST_DEBUG_OBJECT (stream, "complete stream");
5785 g_mutex_lock (&priv->lock);
5787 if (!(priv->allowed_protocols & transport->lower_transport))
5788 goto unallowed_transport;
5790 if (!create_receiver_part (stream, transport))
5791 goto create_receiver_error;
5793 /* in the RECORD case, we only add RTCP sender part */
5794 if (!create_sender_part (stream, transport))
5795 goto create_sender_error;
5797 priv->configured_protocols |= transport->lower_transport;
5799 priv->is_complete = TRUE;
5800 g_mutex_unlock (&priv->lock);
5802 GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated");
5805 create_receiver_error:
5806 create_sender_error:
5807 unallowed_transport:
5809 g_mutex_unlock (&priv->lock);
5815 * gst_rtsp_stream_is_complete:
5816 * @stream: a #GstRTSPStream
5818 * Checks whether the stream is complete, contains the receiver and the sender
5819 * parts. As the stream contains sink(s) element(s), it's possible to perform
5820 * seek operations on it.
5822 * Returns: %TRUE if the stream contains at least one sink element.
5827 gst_rtsp_stream_is_complete (GstRTSPStream * stream)
5829 GstRTSPStreamPrivate *priv;
5830 gboolean ret = FALSE;
5832 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5834 priv = stream->priv;
5835 g_mutex_lock (&priv->lock);
5836 ret = priv->is_complete;
5837 g_mutex_unlock (&priv->lock);
5843 * gst_rtsp_stream_is_sender:
5844 * @stream: a #GstRTSPStream
5846 * Checks whether the stream is a sender.
5848 * Returns: %TRUE if the stream is a sender and %FALSE otherwise.
5853 gst_rtsp_stream_is_sender (GstRTSPStream * stream)
5855 GstRTSPStreamPrivate *priv;
5856 gboolean ret = FALSE;
5858 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5860 priv = stream->priv;
5861 g_mutex_lock (&priv->lock);
5862 ret = (priv->srcpad != NULL);
5863 g_mutex_unlock (&priv->lock);
5869 * gst_rtsp_stream_is_receiver:
5870 * @stream: a #GstRTSPStream
5872 * Checks whether the stream is a receiver.
5874 * Returns: %TRUE if the stream is a receiver and %FALSE otherwise.
5879 gst_rtsp_stream_is_receiver (GstRTSPStream * stream)
5881 GstRTSPStreamPrivate *priv;
5882 gboolean ret = FALSE;
5884 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5886 priv = stream->priv;
5887 g_mutex_lock (&priv->lock);
5888 ret = (priv->sinkpad != NULL);
5889 g_mutex_unlock (&priv->lock);
5894 #define AES_128_KEY_LEN 16
5895 #define AES_256_KEY_LEN 32
5897 #define HMAC_32_KEY_LEN 4
5898 #define HMAC_80_KEY_LEN 10
5901 mikey_apply_policy (GstCaps * caps, GstMIKEYMessage * msg, guint8 policy)
5903 const gchar *srtp_cipher;
5904 const gchar *srtp_auth;
5905 const GstMIKEYPayload *sp;
5908 /* loop over Security policy until we find one containing policy */
5910 if ((sp = gst_mikey_message_find_payload (msg, GST_MIKEY_PT_SP, i)) == NULL)
5913 if (((GstMIKEYPayloadSP *) sp)->policy == policy)
5917 /* the default ciphers */
5918 srtp_cipher = "aes-128-icm";
5919 srtp_auth = "hmac-sha1-80";
5921 /* now override the defaults with what is in the Security Policy */
5924 guint enc_alg = GST_MIKEY_ENC_AES_CM_128;
5926 /* collect all the params and go over them */
5927 len = gst_mikey_payload_sp_get_n_params (sp);
5928 for (i = 0; i < len; i++) {
5929 const GstMIKEYPayloadSPParam *param =
5930 gst_mikey_payload_sp_get_param (sp, i);
5932 switch (param->type) {
5933 case GST_MIKEY_SP_SRTP_ENC_ALG:
5934 enc_alg = param->val[0];
5935 switch (param->val[0]) {
5936 case GST_MIKEY_ENC_NULL:
5937 srtp_cipher = "null";
5939 case GST_MIKEY_ENC_AES_CM_128:
5940 case GST_MIKEY_ENC_AES_KW_128:
5941 srtp_cipher = "aes-128-icm";
5943 case GST_MIKEY_ENC_AES_GCM_128:
5944 srtp_cipher = "aes-128-gcm";
5950 case GST_MIKEY_SP_SRTP_ENC_KEY_LEN:
5951 switch (param->val[0]) {
5952 case AES_128_KEY_LEN:
5953 if (enc_alg == GST_MIKEY_ENC_AES_CM_128 ||
5954 enc_alg == GST_MIKEY_ENC_AES_KW_128) {
5955 srtp_cipher = "aes-128-icm";
5956 } else if (enc_alg == GST_MIKEY_ENC_AES_GCM_128) {
5957 srtp_cipher = "aes-128-gcm";
5960 case AES_256_KEY_LEN:
5961 if (enc_alg == GST_MIKEY_ENC_AES_CM_128 ||
5962 enc_alg == GST_MIKEY_ENC_AES_KW_128) {
5963 srtp_cipher = "aes-256-icm";
5964 } else if (enc_alg == GST_MIKEY_ENC_AES_GCM_128) {
5965 srtp_cipher = "aes-256-gcm";
5972 case GST_MIKEY_SP_SRTP_AUTH_ALG:
5973 switch (param->val[0]) {
5974 case GST_MIKEY_MAC_NULL:
5977 case GST_MIKEY_MAC_HMAC_SHA_1_160:
5978 srtp_auth = "hmac-sha1-80";
5984 case GST_MIKEY_SP_SRTP_AUTH_KEY_LEN:
5985 switch (param->val[0]) {
5986 case HMAC_32_KEY_LEN:
5987 srtp_auth = "hmac-sha1-32";
5989 case HMAC_80_KEY_LEN:
5990 srtp_auth = "hmac-sha1-80";
5996 case GST_MIKEY_SP_SRTP_SRTP_ENC:
5998 case GST_MIKEY_SP_SRTP_SRTCP_ENC:
6005 /* now configure the SRTP parameters */
6006 gst_caps_set_simple (caps,
6007 "srtp-cipher", G_TYPE_STRING, srtp_cipher,
6008 "srtp-auth", G_TYPE_STRING, srtp_auth,
6009 "srtcp-cipher", G_TYPE_STRING, srtp_cipher,
6010 "srtcp-auth", G_TYPE_STRING, srtp_auth, NULL);
6016 handle_mikey_data (GstRTSPStream * stream, guint8 * data, gsize size)
6018 GstMIKEYMessage *msg;
6020 GstCaps *caps = NULL;
6021 GstMIKEYPayloadKEMAC *kemac;
6022 const GstMIKEYPayloadKeyData *pkd;
6025 /* the MIKEY message contains a CSB or crypto session bundle. It is a
6026 * set of Crypto Sessions protected with the same master key.
6027 * In the context of SRTP, an RTP and its RTCP stream is part of a
6029 if ((msg = gst_mikey_message_new_from_data (data, size, NULL, NULL)) == NULL)
6032 /* we can only handle SRTP crypto sessions for now */
6033 if (msg->map_type != GST_MIKEY_MAP_TYPE_SRTP)
6034 goto invalid_map_type;
6036 /* get the number of crypto sessions. This maps SSRC to its
6037 * security parameters */
6038 n_cs = gst_mikey_message_get_n_cs (msg);
6040 goto no_crypto_sessions;
6042 /* we also need keys */
6043 if (!(kemac = (GstMIKEYPayloadKEMAC *) gst_mikey_message_find_payload
6044 (msg, GST_MIKEY_PT_KEMAC, 0)))
6047 /* we don't support encrypted keys */
6048 if (kemac->enc_alg != GST_MIKEY_ENC_NULL
6049 || kemac->mac_alg != GST_MIKEY_MAC_NULL)
6050 goto unsupported_encryption;
6052 /* get Key data sub-payload */
6053 pkd = (const GstMIKEYPayloadKeyData *)
6054 gst_mikey_payload_kemac_get_sub (&kemac->pt, 0);
6056 key = gst_buffer_new_memdup (pkd->key_data, pkd->key_len);
6058 /* go over all crypto sessions and create the security policy for each
6060 for (i = 0; i < n_cs; i++) {
6061 const GstMIKEYMapSRTP *map = gst_mikey_message_get_cs_srtp (msg, i);
6063 caps = gst_caps_new_simple ("application/x-srtp",
6064 "ssrc", G_TYPE_UINT, map->ssrc,
6065 "roc", G_TYPE_UINT, map->roc, "srtp-key", GST_TYPE_BUFFER, key, NULL);
6066 mikey_apply_policy (caps, msg, map->policy);
6068 gst_rtsp_stream_update_crypto (stream, map->ssrc, caps);
6069 gst_caps_unref (caps);
6071 gst_mikey_message_unref (msg);
6072 gst_buffer_unref (key);
6079 GST_DEBUG_OBJECT (stream, "failed to parse MIKEY message");
6084 GST_DEBUG_OBJECT (stream, "invalid map type %d", msg->map_type);
6085 goto cleanup_message;
6089 GST_DEBUG_OBJECT (stream, "no crypto sessions");
6090 goto cleanup_message;
6094 GST_DEBUG_OBJECT (stream, "no keys found");
6095 goto cleanup_message;
6097 unsupported_encryption:
6099 GST_DEBUG_OBJECT (stream, "unsupported key encryption");
6100 goto cleanup_message;
6104 gst_mikey_message_unref (msg);
6109 #define IS_STRIP_CHAR(c) (g_ascii_isspace ((guchar)(c)) || ((c) == '\"'))
6112 strip_chars (gchar * str)
6119 if (!IS_STRIP_CHAR (str[len]))
6123 for (s = str; *s && IS_STRIP_CHAR (*s); s++);
6124 memmove (str, s, len + 1);
6128 * gst_rtsp_stream_handle_keymgmt:
6129 * @stream: a #GstRTSPStream
6130 * @keymgmt: a keymgmt header
6132 * Parse and handle a KeyMgmt header.
6136 /* KeyMgmt = "KeyMgmt" ":" key-mgmt-spec 0*("," key-mgmt-spec)
6137 * key-mgmt-spec = "prot" "=" KMPID ";" ["uri" "=" %x22 URI %x22 ";"]
6140 gst_rtsp_stream_handle_keymgmt (GstRTSPStream * stream, const gchar * keymgmt)
6145 specs = g_strsplit (keymgmt, ",", 0);
6146 for (i = 0; specs[i]; i++) {
6149 split = g_strsplit (specs[i], ";", 0);
6150 for (j = 0; split[j]; j++) {
6151 g_strstrip (split[j]);
6152 if (g_str_has_prefix (split[j], "prot=")) {
6153 g_strstrip (split[j] + 5);
6154 if (!g_str_equal (split[j] + 5, "mikey"))
6156 GST_DEBUG ("found mikey");
6157 } else if (g_str_has_prefix (split[j], "uri=")) {
6158 strip_chars (split[j] + 4);
6159 GST_DEBUG ("found uri '%s'", split[j] + 4);
6160 } else if (g_str_has_prefix (split[j], "data=")) {
6163 strip_chars (split[j] + 5);
6164 GST_DEBUG ("found data '%s'", split[j] + 5);
6165 data = g_base64_decode_inplace (split[j] + 5, &size);
6166 handle_mikey_data (stream, data, size);
6177 * gst_rtsp_stream_get_ulpfec_pt:
6179 * Returns: the payload type used for ULPFEC protection packets
6184 gst_rtsp_stream_get_ulpfec_pt (GstRTSPStream * stream)
6188 g_mutex_lock (&stream->priv->lock);
6189 res = stream->priv->ulpfec_pt;
6190 g_mutex_unlock (&stream->priv->lock);
6196 * gst_rtsp_stream_set_ulpfec_pt:
6198 * Set the payload type to be used for ULPFEC protection packets
6203 gst_rtsp_stream_set_ulpfec_pt (GstRTSPStream * stream, guint pt)
6205 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
6207 g_mutex_lock (&stream->priv->lock);
6208 stream->priv->ulpfec_pt = pt;
6209 if (stream->priv->ulpfec_encoder) {
6210 g_object_set (stream->priv->ulpfec_encoder, "pt", pt, NULL);
6212 g_mutex_unlock (&stream->priv->lock);
6216 * gst_rtsp_stream_request_ulpfec_decoder:
6218 * Creating a rtpulpfecdec element
6220 * Returns: (transfer full) (nullable): a #GstElement.
6225 gst_rtsp_stream_request_ulpfec_decoder (GstRTSPStream * stream,
6226 GstElement * rtpbin, guint sessid)
6228 GObject *internal_storage = NULL;
6230 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
6231 stream->priv->ulpfec_decoder =
6232 gst_object_ref (gst_element_factory_make ("rtpulpfecdec", NULL));
6234 g_signal_emit_by_name (G_OBJECT (rtpbin), "get-internal-storage", sessid,
6236 g_object_set (stream->priv->ulpfec_decoder, "storage", internal_storage,
6238 g_object_unref (internal_storage);
6239 update_ulpfec_decoder_pt (stream);
6241 return stream->priv->ulpfec_decoder;
6245 * gst_rtsp_stream_request_ulpfec_encoder:
6247 * Creating a rtpulpfecenc element
6249 * Returns: (transfer full) (nullable): a #GstElement.
6254 gst_rtsp_stream_request_ulpfec_encoder (GstRTSPStream * stream, guint sessid)
6256 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
6258 if (!stream->priv->ulpfec_percentage)
6261 stream->priv->ulpfec_encoder =
6262 gst_object_ref (gst_element_factory_make ("rtpulpfecenc", NULL));
6264 g_object_set (stream->priv->ulpfec_encoder, "pt", stream->priv->ulpfec_pt,
6265 "percentage", stream->priv->ulpfec_percentage, NULL);
6267 return stream->priv->ulpfec_encoder;
6271 * gst_rtsp_stream_set_ulpfec_percentage:
6273 * Sets the amount of redundancy to apply when creating ULPFEC
6274 * protection packets.
6279 gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream * stream, guint percentage)
6281 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
6283 g_mutex_lock (&stream->priv->lock);
6284 stream->priv->ulpfec_percentage = percentage;
6285 if (stream->priv->ulpfec_encoder) {
6286 g_object_set (stream->priv->ulpfec_encoder, "percentage", percentage, NULL);
6288 g_mutex_unlock (&stream->priv->lock);
6292 * gst_rtsp_stream_get_ulpfec_percentage:
6294 * Returns: the amount of redundancy applied when creating ULPFEC
6295 * protection packets.
6300 gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
6304 g_mutex_lock (&stream->priv->lock);
6305 res = stream->priv->ulpfec_percentage;
6306 g_mutex_unlock (&stream->priv->lock);
6312 * gst_rtsp_stream_set_rate_control:
6314 * Define whether @stream will follow the Rate-Control=no behaviour as specified
6315 * in the ONVIF replay spec.
6320 gst_rtsp_stream_set_rate_control (GstRTSPStream * stream, gboolean enabled)
6322 GST_DEBUG_OBJECT (stream, "%s rate control",
6323 enabled ? "Enabling" : "Disabling");
6325 g_mutex_lock (&stream->priv->lock);
6326 stream->priv->do_rate_control = enabled;
6327 if (stream->priv->appsink[0])
6328 g_object_set (stream->priv->appsink[0], "sync", enabled, NULL);
6329 if (stream->priv->payloader
6330 && g_object_class_find_property (G_OBJECT_GET_CLASS (stream->
6331 priv->payloader), "onvif-no-rate-control"))
6332 g_object_set (stream->priv->payloader, "onvif-no-rate-control", !enabled,
6334 if (stream->priv->session) {
6335 g_object_set (stream->priv->session, "disable-sr-timestamp", !enabled,
6338 g_mutex_unlock (&stream->priv->lock);
6342 * gst_rtsp_stream_get_rate_control:
6344 * Returns: whether @stream will follow the Rate-Control=no behaviour as specified
6345 * in the ONVIF replay spec.
6350 gst_rtsp_stream_get_rate_control (GstRTSPStream * stream)
6354 g_mutex_lock (&stream->priv->lock);
6355 ret = stream->priv->do_rate_control;
6356 g_mutex_unlock (&stream->priv->lock);
6362 * gst_rtsp_stream_unblock_rtcp:
6364 * Remove blocking probe from the RTCP source. When creating an UDP source for
6365 * RTCP it is initially blocked until this function is called.
6366 * This functions should be called once the pipeline is ready for handling RTCP
6372 gst_rtsp_stream_unblock_rtcp (GstRTSPStream * stream)
6374 GstRTSPStreamPrivate *priv;
6376 priv = stream->priv;
6377 g_mutex_lock (&priv->lock);
6378 if (priv->block_early_rtcp_probe != 0) {
6379 gst_pad_remove_probe
6380 (priv->block_early_rtcp_pad, priv->block_early_rtcp_probe);
6381 priv->block_early_rtcp_probe = 0;
6382 gst_object_unref (priv->block_early_rtcp_pad);
6383 priv->block_early_rtcp_pad = NULL;
6385 if (priv->block_early_rtcp_probe_ipv6 != 0) {
6386 gst_pad_remove_probe
6387 (priv->block_early_rtcp_pad_ipv6, priv->block_early_rtcp_probe_ipv6);
6388 priv->block_early_rtcp_probe_ipv6 = 0;
6389 gst_object_unref (priv->block_early_rtcp_pad_ipv6);
6390 priv->block_early_rtcp_pad_ipv6 = NULL;
6392 g_mutex_unlock (&priv->lock);