2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * @short_description: A media stream
22 * @see_also: #GstRTSPMedia
24 * The #GstRTSPStream object manages the data transport for one stream. It
25 * is created from a payloader element and a source pad that produce the RTP
26 * packets for the stream.
28 * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29 * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
31 * The #GstRTSPStream will use the configured addresspool, as set with
32 * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33 * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36 * With gst_rtsp_stream_get_server_port () you can get the port that the server
37 * will use to receive RTCP. This is the part that the clients will use to send
40 * With gst_rtsp_stream_add_transport() destinations can be added where the
41 * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42 * the destination again.
44 * Last reviewed on 2013-07-16 (1.0.0)
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
56 #include "rtsp-stream.h"
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj) \
59 (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
61 struct _GstRTSPStreamPrivate
66 GstElement *payloader;
71 GstRTSPProfile profiles;
72 GstRTSPLowerTrans protocols;
74 /* pads on the rtpbin */
75 GstPad *send_rtp_sink;
79 /* the RTPSession object */
82 /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
84 GstElement *udpsrc_v4[2];
86 /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
88 GstElement *udpsrc_v6[2];
90 GstElement *udpsink[2];
92 /* for TCP transport */
93 GstElement *appsrc[2];
94 GstElement *appqueue[2];
95 GstElement *appsink[2];
98 GstElement *funnel[2];
100 /* server ports for sending/receiving over ipv4 */
101 GstRTSPRange server_port_v4;
102 GstRTSPAddress *server_addr_v4;
105 /* server ports for sending/receiving over ipv6 */
106 GstRTSPRange server_port_v6;
107 GstRTSPAddress *server_addr_v6;
110 /* multicast addresses */
111 GstRTSPAddressPool *pool;
112 GstRTSPAddress *addr_v4;
113 GstRTSPAddress *addr_v6;
115 /* the caps of the stream */
119 /* transports we stream to */
127 /* stream blocking */
132 #define DEFAULT_CONTROL NULL
133 #define DEFAULT_PROFILES GST_RTSP_PROFILE_AVP
134 #define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
135 GST_RTSP_LOWER_TRANS_TCP
146 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
147 #define GST_CAT_DEFAULT rtsp_stream_debug
149 static GQuark ssrc_stream_map_key;
151 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
152 GValue * value, GParamSpec * pspec);
153 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
154 const GValue * value, GParamSpec * pspec);
156 static void gst_rtsp_stream_finalize (GObject * obj);
158 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
161 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
163 GObjectClass *gobject_class;
165 g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
167 gobject_class = G_OBJECT_CLASS (klass);
169 gobject_class->get_property = gst_rtsp_stream_get_property;
170 gobject_class->set_property = gst_rtsp_stream_set_property;
171 gobject_class->finalize = gst_rtsp_stream_finalize;
173 g_object_class_install_property (gobject_class, PROP_CONTROL,
174 g_param_spec_string ("control", "Control",
175 "The control string for this stream", DEFAULT_CONTROL,
176 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
178 g_object_class_install_property (gobject_class, PROP_PROFILES,
179 g_param_spec_flags ("profiles", "Profiles",
180 "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
181 DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
183 g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
184 g_param_spec_flags ("protocols", "Protocols",
185 "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
186 DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
188 GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
190 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
194 gst_rtsp_stream_init (GstRTSPStream * stream)
196 GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
198 GST_DEBUG ("new stream %p", stream);
203 priv->control = g_strdup (DEFAULT_CONTROL);
204 priv->profiles = DEFAULT_PROFILES;
205 priv->protocols = DEFAULT_PROTOCOLS;
207 g_mutex_init (&priv->lock);
211 gst_rtsp_stream_finalize (GObject * obj)
213 GstRTSPStream *stream;
214 GstRTSPStreamPrivate *priv;
216 stream = GST_RTSP_STREAM (obj);
219 GST_DEBUG ("finalize stream %p", stream);
221 /* we really need to be unjoined now */
222 g_return_if_fail (!priv->is_joined);
225 gst_rtsp_address_free (priv->addr_v4);
227 gst_rtsp_address_free (priv->addr_v6);
228 if (priv->server_addr_v4)
229 gst_rtsp_address_free (priv->server_addr_v4);
230 if (priv->server_addr_v6)
231 gst_rtsp_address_free (priv->server_addr_v6);
233 g_object_unref (priv->pool);
234 gst_object_unref (priv->payloader);
235 gst_object_unref (priv->srcpad);
236 g_free (priv->control);
237 g_mutex_clear (&priv->lock);
239 G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
243 gst_rtsp_stream_get_property (GObject * object, guint propid,
244 GValue * value, GParamSpec * pspec)
246 GstRTSPStream *stream = GST_RTSP_STREAM (object);
250 g_value_take_string (value, gst_rtsp_stream_get_control (stream));
253 g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
256 g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
259 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
264 gst_rtsp_stream_set_property (GObject * object, guint propid,
265 const GValue * value, GParamSpec * pspec)
267 GstRTSPStream *stream = GST_RTSP_STREAM (object);
271 gst_rtsp_stream_set_control (stream, g_value_get_string (value));
274 gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
277 gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
280 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
285 * gst_rtsp_stream_new:
288 * @payloader: a #GstElement
290 * Create a new media stream with index @idx that handles RTP data on
291 * @srcpad and has a payloader element @payloader.
293 * Returns: (transfer full): a new #GstRTSPStream
296 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
298 GstRTSPStreamPrivate *priv;
299 GstRTSPStream *stream;
301 g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
302 g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
303 g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
305 stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
308 priv->payloader = gst_object_ref (payloader);
309 priv->srcpad = gst_object_ref (srcpad);
315 * gst_rtsp_stream_get_index:
316 * @stream: a #GstRTSPStream
318 * Get the stream index.
320 * Return: the stream index.
323 gst_rtsp_stream_get_index (GstRTSPStream * stream)
325 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
327 return stream->priv->idx;
331 * gst_rtsp_stream_get_pt:
332 * @stream: a #GstRTSPStream
334 * Get the stream payload type.
336 * Return: the stream payload type.
339 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
341 GstRTSPStreamPrivate *priv;
344 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
348 g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
354 * gst_rtsp_stream_get_srcpad:
355 * @stream: a #GstRTSPStream
357 * Get the srcpad associated with @stream.
359 * Returns: (transfer full): the srcpad. Unref after usage.
362 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
364 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
366 return gst_object_ref (stream->priv->srcpad);
370 * gst_rtsp_stream_get_control:
371 * @stream: a #GstRTSPStream
373 * Get the control string to identify this stream.
375 * Returns: (transfer full): the control string. g_free() after usage.
378 gst_rtsp_stream_get_control (GstRTSPStream * stream)
380 GstRTSPStreamPrivate *priv;
383 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
387 g_mutex_lock (&priv->lock);
388 if ((result = g_strdup (priv->control)) == NULL)
389 result = g_strdup_printf ("stream=%u", priv->idx);
390 g_mutex_unlock (&priv->lock);
396 * gst_rtsp_stream_set_control:
397 * @stream: a #GstRTSPStream
398 * @control: a control string
400 * Set the control string in @stream.
403 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
405 GstRTSPStreamPrivate *priv;
407 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
411 g_mutex_lock (&priv->lock);
412 g_free (priv->control);
413 priv->control = g_strdup (control);
414 g_mutex_unlock (&priv->lock);
418 * gst_rtsp_stream_has_control:
419 * @stream: a #GstRTSPStream
420 * @control: a control string
422 * Check if @stream has the control string @control.
424 * Returns: %TRUE is @stream has @control as the control string
427 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
429 GstRTSPStreamPrivate *priv;
432 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
436 g_mutex_lock (&priv->lock);
438 res = (g_strcmp0 (priv->control, control) == 0);
442 if (sscanf (control, "stream=%u", &streamid) > 0)
443 res = (streamid == priv->idx);
447 g_mutex_unlock (&priv->lock);
453 * gst_rtsp_stream_set_mtu:
454 * @stream: a #GstRTSPStream
457 * Configure the mtu in the payloader of @stream to @mtu.
460 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
462 GstRTSPStreamPrivate *priv;
464 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
468 GST_LOG_OBJECT (stream, "set MTU %u", mtu);
470 g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
474 * gst_rtsp_stream_get_mtu:
475 * @stream: a #GstRTSPStream
477 * Get the configured MTU in the payloader of @stream.
479 * Returns: the MTU of the payloader.
482 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
484 GstRTSPStreamPrivate *priv;
487 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
491 g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
496 /* Update the dscp qos property on the udp sinks */
498 update_dscp_qos (GstRTSPStream * stream)
500 GstRTSPStreamPrivate *priv;
502 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
506 if (priv->udpsink[0]) {
507 g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
511 if (priv->udpsink[1]) {
512 g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
518 * gst_rtsp_stream_set_dscp_qos:
519 * @stream: a #GstRTSPStream
520 * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
522 * Configure the dscp qos of the outgoing sockets to @dscp_qos.
525 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
527 GstRTSPStreamPrivate *priv;
529 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
533 GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
535 if (dscp_qos < -1 || dscp_qos > 63) {
536 GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
540 priv->dscp_qos = dscp_qos;
542 update_dscp_qos (stream);
546 * gst_rtsp_stream_get_dscp_qos:
547 * @stream: a #GstRTSPStream
549 * Get the configured DSCP QoS in of the outgoing sockets.
551 * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
554 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
556 GstRTSPStreamPrivate *priv;
558 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
562 return priv->dscp_qos;
566 * gst_rtsp_stream_is_transport_supported:
567 * @stream: a #GstRTSPStream
568 * @transport: (transfer none): a #GstRTSPTransport
570 * Check if @transport can be handled by stream
572 * Returns: %TRUE if @transport can be handled by @stream.
575 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
576 GstRTSPTransport * transport)
578 GstRTSPStreamPrivate *priv;
580 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
584 g_mutex_lock (&priv->lock);
585 if (transport->trans != GST_RTSP_TRANS_RTP)
586 goto unsupported_transmode;
588 if (!(transport->profile & priv->profiles))
589 goto unsupported_profile;
591 if (!(transport->lower_transport & priv->protocols))
592 goto unsupported_ltrans;
594 g_mutex_unlock (&priv->lock);
599 unsupported_transmode:
601 GST_DEBUG ("unsupported transport mode %d", transport->trans);
602 g_mutex_unlock (&priv->lock);
607 GST_DEBUG ("unsupported profile %d", transport->profile);
608 g_mutex_unlock (&priv->lock);
613 GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
614 g_mutex_unlock (&priv->lock);
620 * gst_rtsp_stream_set_profiles:
621 * @stream: a #GstRTSPStream
622 * @profiles: the new profiles
624 * Configure the allowed profiles for @stream.
627 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
629 GstRTSPStreamPrivate *priv;
631 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
635 g_mutex_lock (&priv->lock);
636 priv->profiles = profiles;
637 g_mutex_unlock (&priv->lock);
641 * gst_rtsp_stream_get_profiles:
642 * @stream: a #GstRTSPStream
644 * Get the allowed profiles of @stream.
646 * Returns: a #GstRTSPProfile
649 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
651 GstRTSPStreamPrivate *priv;
654 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
658 g_mutex_lock (&priv->lock);
659 res = priv->profiles;
660 g_mutex_unlock (&priv->lock);
666 * gst_rtsp_stream_set_protocols:
667 * @stream: a #GstRTSPStream
668 * @protocols: the new flags
670 * Configure the allowed lower transport for @stream.
673 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
674 GstRTSPLowerTrans protocols)
676 GstRTSPStreamPrivate *priv;
678 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
682 g_mutex_lock (&priv->lock);
683 priv->protocols = protocols;
684 g_mutex_unlock (&priv->lock);
688 * gst_rtsp_stream_get_protocols:
689 * @stream: a #GstRTSPStream
691 * Get the allowed protocols of @stream.
693 * Returns: a #GstRTSPLowerTrans
696 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
698 GstRTSPStreamPrivate *priv;
699 GstRTSPLowerTrans res;
701 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
702 GST_RTSP_LOWER_TRANS_UNKNOWN);
706 g_mutex_lock (&priv->lock);
707 res = priv->protocols;
708 g_mutex_unlock (&priv->lock);
714 * gst_rtsp_stream_set_address_pool:
715 * @stream: a #GstRTSPStream
716 * @pool: (transfer none): a #GstRTSPAddressPool
718 * configure @pool to be used as the address pool of @stream.
721 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
722 GstRTSPAddressPool * pool)
724 GstRTSPStreamPrivate *priv;
725 GstRTSPAddressPool *old;
727 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
731 GST_LOG_OBJECT (stream, "set address pool %p", pool);
733 g_mutex_lock (&priv->lock);
734 if ((old = priv->pool) != pool)
735 priv->pool = pool ? g_object_ref (pool) : NULL;
738 g_mutex_unlock (&priv->lock);
741 g_object_unref (old);
745 * gst_rtsp_stream_get_address_pool:
746 * @stream: a #GstRTSPStream
748 * Get the #GstRTSPAddressPool used as the address pool of @stream.
750 * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
754 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
756 GstRTSPStreamPrivate *priv;
757 GstRTSPAddressPool *result;
759 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
763 g_mutex_lock (&priv->lock);
764 if ((result = priv->pool))
765 g_object_ref (result);
766 g_mutex_unlock (&priv->lock);
772 * gst_rtsp_stream_get_multicast_address:
773 * @stream: a #GstRTSPStream
774 * @family: the #GSocketFamily
776 * Get the multicast address of @stream for @family.
778 * Returns: (transfer full): the #GstRTSPAddress of @stream or %NULL when no
779 * address could be allocated. gst_rtsp_address_free() after usage.
782 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
783 GSocketFamily family)
785 GstRTSPStreamPrivate *priv;
786 GstRTSPAddress *result;
787 GstRTSPAddress **addrp;
788 GstRTSPAddressFlags flags;
790 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
794 if (family == G_SOCKET_FAMILY_IPV6) {
795 flags = GST_RTSP_ADDRESS_FLAG_IPV6;
796 addrp = &priv->addr_v6;
798 flags = GST_RTSP_ADDRESS_FLAG_IPV4;
799 addrp = &priv->addr_v4;
802 g_mutex_lock (&priv->lock);
803 if (*addrp == NULL) {
804 if (priv->pool == NULL)
807 flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
809 *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
813 result = gst_rtsp_address_copy (*addrp);
814 g_mutex_unlock (&priv->lock);
821 GST_ERROR_OBJECT (stream, "no address pool specified");
822 g_mutex_unlock (&priv->lock);
827 GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
828 g_mutex_unlock (&priv->lock);
834 * gst_rtsp_stream_reserve_address:
835 * @stream: a #GstRTSPStream
836 * @address: an address
841 * Reserve @address and @port as the address and port of @stream.
843 * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
844 * reserved. gst_rtsp_address_free() after usage.
847 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
848 const gchar * address, guint port, guint n_ports, guint ttl)
850 GstRTSPStreamPrivate *priv;
851 GstRTSPAddress *result;
853 GSocketFamily family;
854 GstRTSPAddress **addrp;
856 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
857 g_return_val_if_fail (address != NULL, NULL);
858 g_return_val_if_fail (port > 0, NULL);
859 g_return_val_if_fail (n_ports > 0, NULL);
860 g_return_val_if_fail (ttl > 0, NULL);
864 addr = g_inet_address_new_from_string (address);
866 GST_ERROR ("failed to get inet addr from %s", address);
867 family = G_SOCKET_FAMILY_IPV4;
869 family = g_inet_address_get_family (addr);
870 g_object_unref (addr);
873 if (family == G_SOCKET_FAMILY_IPV6)
874 addrp = &priv->addr_v6;
876 addrp = &priv->addr_v4;
878 g_mutex_lock (&priv->lock);
879 if (*addrp == NULL) {
880 GstRTSPAddressPoolResult res;
882 if (priv->pool == NULL)
885 res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
886 port, n_ports, ttl, addrp);
887 if (res != GST_RTSP_ADDRESS_POOL_OK)
890 if (strcmp ((*addrp)->address, address) ||
891 (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
892 (*addrp)->ttl != ttl)
893 goto different_address;
895 result = gst_rtsp_address_copy (*addrp);
896 g_mutex_unlock (&priv->lock);
903 GST_ERROR_OBJECT (stream, "no address pool specified");
904 g_mutex_unlock (&priv->lock);
909 GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
911 g_mutex_unlock (&priv->lock);
916 GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
917 " reserved", address);
918 g_mutex_unlock (&priv->lock);
924 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
925 GSocketFamily family, GstElement * udpsrc_out[2],
926 GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
927 GstRTSPAddress ** server_addr_out)
929 GstStateChangeReturn ret;
930 GstElement *udpsrc0, *udpsrc1;
931 GstElement *udpsink0, *udpsink1;
932 GSocket *rtp_socket = NULL;
933 GSocket *rtcp_socket;
934 gint tmp_rtp, tmp_rtcp;
936 gint rtpport, rtcpport;
937 GList *rejected_addresses = NULL;
938 GstRTSPAddress *addr = NULL;
939 GInetAddress *inetaddr = NULL;
940 GSocketAddress *rtp_sockaddr = NULL;
941 GSocketAddress *rtcp_sockaddr = NULL;
942 const gchar *multisink_socket;
944 if (family == G_SOCKET_FAMILY_IPV6)
945 multisink_socket = "socket-v6";
947 multisink_socket = "socket";
955 /* Start with random port */
958 rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
959 G_SOCKET_PROTOCOL_UDP, NULL);
961 goto no_udp_protocol;
963 if (*server_addr_out)
964 gst_rtsp_address_free (*server_addr_out);
966 /* try to allocate 2 UDP ports, the RTP port should be an even
967 * number and the RTCP port should be the next (uneven) port */
970 if (rtp_socket == NULL) {
971 rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
972 G_SOCKET_PROTOCOL_UDP, NULL);
974 goto no_udp_protocol;
977 if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
978 GstRTSPAddressFlags flags;
981 rejected_addresses = g_list_prepend (rejected_addresses, addr);
983 flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
984 if (family == G_SOCKET_FAMILY_IPV6)
985 flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
987 flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
989 addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
994 tmp_rtp = addr->port;
996 g_clear_object (&inetaddr);
997 inetaddr = g_inet_address_new_from_string (addr->address);
1005 if (inetaddr == NULL)
1006 inetaddr = g_inet_address_new_any (family);
1009 rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1010 if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1011 g_object_unref (rtp_sockaddr);
1014 g_object_unref (rtp_sockaddr);
1016 rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1017 if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1018 g_clear_object (&rtp_sockaddr);
1023 g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1024 g_object_unref (rtp_sockaddr);
1026 /* check if port is even */
1027 if ((tmp_rtp & 1) != 0) {
1028 /* port not even, close and allocate another */
1030 g_clear_object (&rtp_socket);
1035 tmp_rtcp = tmp_rtp + 1;
1037 rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1038 if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1039 g_object_unref (rtcp_sockaddr);
1040 g_clear_object (&rtp_socket);
1043 g_object_unref (rtcp_sockaddr);
1045 g_clear_object (&inetaddr);
1047 udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1048 udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1050 if (udpsrc0 == NULL || udpsrc1 == NULL)
1051 goto no_udp_protocol;
1053 g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1054 g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1056 ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
1057 if (ret == GST_STATE_CHANGE_FAILURE)
1059 ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
1060 if (ret == GST_STATE_CHANGE_FAILURE)
1063 /* all fine, do port check */
1064 g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1065 g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1067 /* this should not happen... */
1068 if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1072 udpsink0 = udpsink_out[0];
1074 udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1077 goto no_udp_protocol;
1079 g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1080 g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1083 udpsink1 = udpsink_out[1];
1085 udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1088 goto no_udp_protocol;
1090 g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1091 g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1092 g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1094 g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1095 g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1096 g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1097 g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1098 g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1099 g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1100 g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1101 g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1103 /* we keep these elements, we will further configure them when the
1104 * client told us to really use the UDP ports. */
1105 udpsrc_out[0] = udpsrc0;
1106 udpsrc_out[1] = udpsrc1;
1107 udpsink_out[0] = udpsink0;
1108 udpsink_out[1] = udpsink1;
1109 server_port_out->min = rtpport;
1110 server_port_out->max = rtcpport;
1112 *server_addr_out = addr;
1113 g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1115 g_object_unref (rtp_socket);
1116 g_object_unref (rtcp_socket);
1144 gst_element_set_state (udpsrc0, GST_STATE_NULL);
1145 gst_object_unref (udpsrc0);
1148 gst_element_set_state (udpsrc1, GST_STATE_NULL);
1149 gst_object_unref (udpsrc1);
1152 gst_element_set_state (udpsink0, GST_STATE_NULL);
1153 gst_object_unref (udpsink0);
1156 g_object_unref (inetaddr);
1157 g_list_free_full (rejected_addresses,
1158 (GDestroyNotify) gst_rtsp_address_free);
1160 gst_rtsp_address_free (addr);
1162 g_object_unref (rtp_socket);
1164 g_object_unref (rtcp_socket);
1169 /* must be called with lock */
1171 alloc_ports (GstRTSPStream * stream)
1173 GstRTSPStreamPrivate *priv = stream->priv;
1175 priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1176 G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1177 &priv->server_port_v4, &priv->server_addr_v4);
1179 priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1180 G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1181 &priv->server_port_v6, &priv->server_addr_v6);
1183 return priv->have_ipv4 || priv->have_ipv6;
1187 * gst_rtsp_stream_get_server_port:
1188 * @stream: a #GstRTSPStream
1189 * @server_port: (out): result server port
1190 * @family: the port family to get
1192 * Fill @server_port with the port pair used by the server. This function can
1193 * only be called when @stream has been joined.
1196 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1197 GstRTSPRange * server_port, GSocketFamily family)
1199 GstRTSPStreamPrivate *priv;
1201 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1202 priv = stream->priv;
1203 g_return_if_fail (priv->is_joined);
1205 g_mutex_lock (&priv->lock);
1206 if (family == G_SOCKET_FAMILY_IPV4) {
1208 *server_port = priv->server_port_v4;
1211 *server_port = priv->server_port_v6;
1213 g_mutex_unlock (&priv->lock);
1217 * gst_rtsp_stream_get_rtpsession:
1218 * @stream: a #GstRTSPStream
1220 * Get the RTP session of this stream.
1222 * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1225 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1227 GstRTSPStreamPrivate *priv;
1230 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1232 priv = stream->priv;
1234 g_mutex_lock (&priv->lock);
1235 if ((session = priv->session))
1236 g_object_ref (session);
1237 g_mutex_unlock (&priv->lock);
1243 * gst_rtsp_stream_get_ssrc:
1244 * @stream: a #GstRTSPStream
1245 * @ssrc: (out): result ssrc
1247 * Get the SSRC used by the RTP session of this stream. This function can only
1248 * be called when @stream has been joined.
1251 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1253 GstRTSPStreamPrivate *priv;
1255 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1256 priv = stream->priv;
1257 g_return_if_fail (priv->is_joined);
1259 g_mutex_lock (&priv->lock);
1260 if (ssrc && priv->session)
1261 g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1262 g_mutex_unlock (&priv->lock);
1265 /* executed from streaming thread */
1267 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1269 GstRTSPStreamPrivate *priv = stream->priv;
1270 GstCaps *newcaps, *oldcaps;
1272 newcaps = gst_pad_get_current_caps (pad);
1274 GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1277 g_mutex_lock (&priv->lock);
1278 oldcaps = priv->caps;
1279 priv->caps = newcaps;
1280 g_mutex_unlock (&priv->lock);
1283 gst_caps_unref (oldcaps);
1287 dump_structure (const GstStructure * s)
1291 sstr = gst_structure_to_string (s);
1292 GST_INFO ("structure: %s", sstr);
1296 static GstRTSPStreamTransport *
1297 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1299 GstRTSPStreamPrivate *priv = stream->priv;
1301 GstRTSPStreamTransport *result = NULL;
1306 if (rtcp_from == NULL)
1309 tmp = g_strrstr (rtcp_from, ":");
1313 port = atoi (tmp + 1);
1314 dest = g_strndup (rtcp_from, tmp - rtcp_from);
1316 g_mutex_lock (&priv->lock);
1317 GST_INFO ("finding %s:%d in %d transports", dest, port,
1318 g_list_length (priv->transports));
1320 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1321 GstRTSPStreamTransport *trans = walk->data;
1322 const GstRTSPTransport *tr;
1325 tr = gst_rtsp_stream_transport_get_transport (trans);
1327 min = tr->client_port.min;
1328 max = tr->client_port.max;
1330 if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1336 g_object_ref (result);
1337 g_mutex_unlock (&priv->lock);
1344 static GstRTSPStreamTransport *
1345 check_transport (GObject * source, GstRTSPStream * stream)
1347 GstStructure *stats;
1348 GstRTSPStreamTransport *trans;
1350 /* see if we have a stream to match with the origin of the RTCP packet */
1351 trans = g_object_get_qdata (source, ssrc_stream_map_key);
1352 if (trans == NULL) {
1353 g_object_get (source, "stats", &stats, NULL);
1355 const gchar *rtcp_from;
1357 dump_structure (stats);
1359 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1360 if ((trans = find_transport (stream, rtcp_from))) {
1361 GST_INFO ("%p: found transport %p for source %p", stream, trans,
1363 g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1366 gst_structure_free (stats);
1374 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1376 GstRTSPStreamTransport *trans;
1378 GST_INFO ("%p: new source %p", stream, source);
1380 trans = check_transport (source, stream);
1383 GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1387 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1389 GST_INFO ("%p: new SDES %p", stream, source);
1393 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1395 GstRTSPStreamTransport *trans;
1397 trans = check_transport (source, stream);
1400 GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1401 gst_rtsp_stream_transport_keep_alive (trans);
1405 GstStructure *stats;
1406 g_object_get (source, "stats", &stats, NULL);
1408 dump_structure (stats);
1409 gst_structure_free (stats);
1416 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1418 GST_INFO ("%p: source %p bye", stream, source);
1422 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1424 GstRTSPStreamTransport *trans;
1426 GST_INFO ("%p: source %p bye timeout", stream, source);
1428 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1429 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1430 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1435 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1437 GstRTSPStreamTransport *trans;
1439 GST_INFO ("%p: source %p timeout", stream, source);
1441 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1442 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1443 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1448 clear_tr_cache (GstRTSPStreamPrivate * priv)
1450 g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1451 g_list_free (priv->tr_cache);
1452 priv->tr_cache = NULL;
1455 static GstFlowReturn
1456 handle_new_sample (GstAppSink * sink, gpointer user_data)
1458 GstRTSPStreamPrivate *priv;
1462 GstRTSPStream *stream;
1465 sample = gst_app_sink_pull_sample (sink);
1469 stream = (GstRTSPStream *) user_data;
1470 priv = stream->priv;
1471 buffer = gst_sample_get_buffer (sample);
1473 is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1475 g_mutex_lock (&priv->lock);
1476 if (priv->tr_changed) {
1477 clear_tr_cache (priv);
1478 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1479 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1480 priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1482 priv->tr_changed = FALSE;
1484 g_mutex_unlock (&priv->lock);
1486 for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1487 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1490 gst_rtsp_stream_transport_send_rtp (tr, buffer);
1492 gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1495 gst_sample_unref (sample);
1500 static GstAppSinkCallbacks sink_cb = {
1501 NULL, /* not interested in EOS */
1502 NULL, /* not interested in preroll samples */
1507 * gst_rtsp_stream_join_bin:
1508 * @stream: a #GstRTSPStream
1509 * @bin: (transfer none): a #GstBin to join
1510 * @rtpbin: (transfer none): a rtpbin element in @bin
1511 * @state: the target state of the new elements
1513 * Join the #GstBin @bin that contains the element @rtpbin.
1515 * @stream will link to @rtpbin, which must be inside @bin. The elements
1516 * added to @bin will be set to the state given in @state.
1518 * Returns: %TRUE on success.
1521 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1522 GstElement * rtpbin, GstState state)
1524 GstRTSPStreamPrivate *priv;
1528 GstPad *pad, *sinkpad, *selpad;
1529 GstPadLinkReturn ret;
1531 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1532 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1533 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1535 priv = stream->priv;
1537 g_mutex_lock (&priv->lock);
1538 if (priv->is_joined)
1541 /* create a session with the same index as the stream */
1544 GST_INFO ("stream %p joining bin as session %u", stream, idx);
1546 if (!alloc_ports (stream))
1549 /* update the dscp qos field in the sinks */
1550 update_dscp_qos (stream);
1552 /* get a pad for sending RTP */
1553 name = g_strdup_printf ("send_rtp_sink_%u", idx);
1554 priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1556 /* link the RTP pad to the session manager, it should not really fail unless
1557 * this is not really an RTP pad */
1558 ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1559 if (ret != GST_PAD_LINK_OK)
1562 /* get pads from the RTP session element for sending and receiving
1564 name = g_strdup_printf ("send_rtp_src_%u", idx);
1565 priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1567 name = g_strdup_printf ("send_rtcp_src_%u", idx);
1568 priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1570 name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1571 priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1573 name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1574 priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1577 /* get the session */
1578 g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1580 g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1582 g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1584 g_signal_connect (priv->session, "on-ssrc-active",
1585 (GCallback) on_ssrc_active, stream);
1586 g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1588 g_signal_connect (priv->session, "on-bye-timeout",
1589 (GCallback) on_bye_timeout, stream);
1590 g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1593 for (i = 0; i < 2; i++) {
1594 GstPad *teepad, *queuepad;
1595 /* For the sender we create this bit of pipeline for both
1596 * RTP and RTCP. Sync and preroll are enabled on udpsink so
1597 * we need to add a queue before appsink to make the pipeline
1598 * not block. For the TCP case, we want to pump data to the
1599 * client as fast as possible anyway.
1601 * .--------. .-----. .---------.
1602 * | rtpbin | | tee | | udpsink |
1603 * | send->sink src->sink |
1604 * '--------' | | '---------'
1605 * | | .---------. .---------.
1606 * | | | queue | | appsink |
1607 * | src->sink src->sink |
1608 * '-----' '---------' '---------'
1610 * When only UDP is allowed, we skip the tee, queue and appsink and link the
1611 * udpsink directly to the session.
1614 gst_bin_add (bin, priv->udpsink[i]);
1615 sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1617 if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1618 /* make tee for RTP/RTCP */
1619 priv->tee[i] = gst_element_factory_make ("tee", NULL);
1620 gst_bin_add (bin, priv->tee[i]);
1622 /* and link to rtpbin send pad */
1623 pad = gst_element_get_static_pad (priv->tee[i], "sink");
1624 gst_pad_link (priv->send_src[i], pad);
1625 gst_object_unref (pad);
1627 /* link tee to udpsink */
1628 teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1629 gst_pad_link (teepad, sinkpad);
1630 gst_object_unref (teepad);
1633 priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1634 gst_bin_add (bin, priv->appqueue[i]);
1635 /* and link to tee */
1636 teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1637 pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1638 gst_pad_link (teepad, pad);
1639 gst_object_unref (pad);
1640 gst_object_unref (teepad);
1643 priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1644 g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1645 g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1646 gst_bin_add (bin, priv->appsink[i]);
1647 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1648 &sink_cb, stream, NULL);
1649 /* and link to queue */
1650 queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1651 pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1652 gst_pad_link (queuepad, pad);
1653 gst_object_unref (pad);
1654 gst_object_unref (queuepad);
1656 /* else only udpsink needed, link it to the session */
1657 gst_pad_link (priv->send_src[i], sinkpad);
1659 gst_object_unref (sinkpad);
1661 /* For the receiver we create this bit of pipeline for both
1662 * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1663 * and it is all funneled into the rtpbin receive pad.
1665 * .--------. .--------. .--------.
1666 * | udpsrc | | funnel | | rtpbin |
1667 * | src->sink src->sink |
1668 * '--------' | | '--------'
1672 * '--------' '--------'
1674 /* make funnel for the RTP/RTCP receivers */
1675 priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1676 gst_bin_add (bin, priv->funnel[i]);
1678 pad = gst_element_get_static_pad (priv->funnel[i], "src");
1679 gst_pad_link (pad, priv->recv_sink[i]);
1680 gst_object_unref (pad);
1682 if (priv->udpsrc_v4[i]) {
1683 /* we set and keep these to playing so that they don't cause NO_PREROLL return
1685 gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1686 gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1688 gst_bin_add (bin, priv->udpsrc_v4[i]);
1690 /* and link to the funnel v4 */
1691 selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1692 pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1693 gst_pad_link (pad, selpad);
1694 gst_object_unref (pad);
1695 gst_object_unref (selpad);
1698 if (priv->udpsrc_v6[i]) {
1699 gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1700 gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1701 gst_bin_add (bin, priv->udpsrc_v6[i]);
1703 /* and link to the funnel v6 */
1704 selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1705 pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1706 gst_pad_link (pad, selpad);
1707 gst_object_unref (pad);
1708 gst_object_unref (selpad);
1711 if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1712 /* make and add appsrc */
1713 priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1714 gst_bin_add (bin, priv->appsrc[i]);
1715 /* and link to the funnel */
1716 selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1717 pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1718 gst_pad_link (pad, selpad);
1719 gst_object_unref (pad);
1720 gst_object_unref (selpad);
1723 /* check if we need to set to a special state */
1724 if (state != GST_STATE_NULL) {
1725 if (priv->udpsink[i])
1726 gst_element_set_state (priv->udpsink[i], state);
1727 if (priv->appsink[i])
1728 gst_element_set_state (priv->appsink[i], state);
1729 if (priv->appqueue[i])
1730 gst_element_set_state (priv->appqueue[i], state);
1732 gst_element_set_state (priv->tee[i], state);
1733 if (priv->funnel[i])
1734 gst_element_set_state (priv->funnel[i], state);
1735 if (priv->appsrc[i])
1736 gst_element_set_state (priv->appsrc[i], state);
1740 /* be notified of caps changes */
1741 priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
1742 (GCallback) caps_notify, stream);
1744 priv->is_joined = TRUE;
1745 g_mutex_unlock (&priv->lock);
1752 g_mutex_unlock (&priv->lock);
1757 g_mutex_unlock (&priv->lock);
1758 GST_WARNING ("failed to allocate ports %u", idx);
1763 GST_WARNING ("failed to link stream %u", idx);
1764 gst_object_unref (priv->send_rtp_sink);
1765 priv->send_rtp_sink = NULL;
1766 g_mutex_unlock (&priv->lock);
1772 * gst_rtsp_stream_leave_bin:
1773 * @stream: a #GstRTSPStream
1774 * @bin: (transfer none): a #GstBin
1775 * @rtpbin: (transfer none): a rtpbin #GstElement
1777 * Remove the elements of @stream from @bin.
1779 * Return: %TRUE on success.
1782 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1783 GstElement * rtpbin)
1785 GstRTSPStreamPrivate *priv;
1788 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1789 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1790 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1792 priv = stream->priv;
1794 g_mutex_lock (&priv->lock);
1795 if (!priv->is_joined)
1796 goto was_not_joined;
1798 /* all transports must be removed by now */
1799 g_return_val_if_fail (priv->transports == NULL, FALSE);
1801 clear_tr_cache (priv);
1803 GST_INFO ("stream %p leaving bin", stream);
1805 gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1806 g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
1807 gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1808 gst_object_unref (priv->send_rtp_sink);
1809 priv->send_rtp_sink = NULL;
1811 for (i = 0; i < 2; i++) {
1812 if (priv->udpsink[i])
1813 gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1814 if (priv->appsink[i])
1815 gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1816 if (priv->appqueue[i])
1817 gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1819 gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1820 if (priv->funnel[i])
1821 gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1822 if (priv->appsrc[i])
1823 gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1824 if (priv->udpsrc_v4[i]) {
1825 /* and set udpsrc to NULL now before removing */
1826 gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1827 gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1828 /* removing them should also nicely release the request
1829 * pads when they finalize */
1830 gst_bin_remove (bin, priv->udpsrc_v4[i]);
1832 if (priv->udpsrc_v6[i]) {
1833 gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1834 gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1835 gst_bin_remove (bin, priv->udpsrc_v6[i]);
1837 if (priv->udpsink[i])
1838 gst_bin_remove (bin, priv->udpsink[i]);
1839 if (priv->appsrc[i])
1840 gst_bin_remove (bin, priv->appsrc[i]);
1841 if (priv->appsink[i])
1842 gst_bin_remove (bin, priv->appsink[i]);
1843 if (priv->appqueue[i])
1844 gst_bin_remove (bin, priv->appqueue[i]);
1846 gst_bin_remove (bin, priv->tee[i]);
1847 if (priv->funnel[i])
1848 gst_bin_remove (bin, priv->funnel[i]);
1850 gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1851 gst_object_unref (priv->recv_sink[i]);
1852 priv->recv_sink[i] = NULL;
1854 priv->udpsrc_v4[i] = NULL;
1855 priv->udpsrc_v6[i] = NULL;
1856 priv->udpsink[i] = NULL;
1857 priv->appsrc[i] = NULL;
1858 priv->appsink[i] = NULL;
1859 priv->appqueue[i] = NULL;
1860 priv->tee[i] = NULL;
1861 priv->funnel[i] = NULL;
1863 gst_object_unref (priv->send_src[0]);
1864 priv->send_src[0] = NULL;
1866 gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1867 gst_object_unref (priv->send_src[1]);
1868 priv->send_src[1] = NULL;
1870 g_object_unref (priv->session);
1871 priv->session = NULL;
1873 gst_caps_unref (priv->caps);
1876 priv->is_joined = FALSE;
1877 g_mutex_unlock (&priv->lock);
1883 g_mutex_unlock (&priv->lock);
1889 * gst_rtsp_stream_get_rtpinfo:
1890 * @stream: a #GstRTSPStream
1891 * @rtptime: (allow-none): result RTP timestamp
1892 * @seq: (allow-none): result RTP seqnum
1893 * @clock_rate: (allow-none): the clock rate
1894 * @running_time: (allow-none): result running-time
1896 * Retrieve the current rtptime, seq and running-time. This is used to
1897 * construct a RTPInfo reply header.
1899 * Returns: %TRUE when rtptime, seq and running-time could be determined.
1902 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1903 guint * rtptime, guint * seq, guint * clock_rate,
1904 GstClockTime * running_time)
1906 GstRTSPStreamPrivate *priv;
1907 GstStructure *stats;
1908 GObjectClass *payobjclass;
1910 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1912 priv = stream->priv;
1914 payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1916 g_mutex_lock (&priv->lock);
1918 if (g_object_class_find_property (payobjclass, "stats")) {
1919 g_object_get (priv->payloader, "stats", &stats, NULL);
1924 gst_structure_get_uint (stats, "seqnum", seq);
1927 gst_structure_get_uint (stats, "timestamp", rtptime);
1930 gst_structure_get_clock_time (stats, "running-time", running_time);
1933 gst_structure_get_uint (stats, "clock-rate", clock_rate);
1934 if (*clock_rate == 0 && running_time)
1935 *running_time = GST_CLOCK_TIME_NONE;
1937 gst_structure_free (stats);
1939 if (!g_object_class_find_property (payobjclass, "seqnum") ||
1940 !g_object_class_find_property (payobjclass, "timestamp"))
1944 g_object_get (priv->payloader, "seqnum", seq, NULL);
1947 g_object_get (priv->payloader, "timestamp", rtptime, NULL);
1950 *running_time = GST_CLOCK_TIME_NONE;
1952 g_mutex_unlock (&priv->lock);
1959 GST_WARNING ("Could not get payloader stats");
1960 g_mutex_unlock (&priv->lock);
1966 * gst_rtsp_stream_get_caps:
1967 * @stream: a #GstRTSPStream
1969 * Retrieve the current caps of @stream.
1971 * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1975 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1977 GstRTSPStreamPrivate *priv;
1980 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1982 priv = stream->priv;
1984 g_mutex_lock (&priv->lock);
1985 if ((result = priv->caps))
1986 gst_caps_ref (result);
1987 g_mutex_unlock (&priv->lock);
1993 * gst_rtsp_stream_recv_rtp:
1994 * @stream: a #GstRTSPStream
1995 * @buffer: (transfer full): a #GstBuffer
1997 * Handle an RTP buffer for the stream. This method is usually called when a
1998 * message has been received from a client using the TCP transport.
2000 * This function takes ownership of @buffer.
2002 * Returns: a GstFlowReturn.
2005 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2007 GstRTSPStreamPrivate *priv;
2009 GstElement *element;
2011 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2012 priv = stream->priv;
2013 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2014 g_return_val_if_fail (priv->is_joined, FALSE);
2016 g_mutex_lock (&priv->lock);
2017 if (priv->appsrc[0])
2018 element = gst_object_ref (priv->appsrc[0]);
2021 g_mutex_unlock (&priv->lock);
2024 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2025 gst_object_unref (element);
2033 * gst_rtsp_stream_recv_rtcp:
2034 * @stream: a #GstRTSPStream
2035 * @buffer: (transfer full): a #GstBuffer
2037 * Handle an RTCP buffer for the stream. This method is usually called when a
2038 * message has been received from a client using the TCP transport.
2040 * This function takes ownership of @buffer.
2042 * Returns: a GstFlowReturn.
2045 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2047 GstRTSPStreamPrivate *priv;
2049 GstElement *element;
2051 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2052 priv = stream->priv;
2053 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2054 g_return_val_if_fail (priv->is_joined, FALSE);
2056 g_mutex_lock (&priv->lock);
2057 if (priv->appsrc[1])
2058 element = gst_object_ref (priv->appsrc[1]);
2061 g_mutex_unlock (&priv->lock);
2064 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2065 gst_object_unref (element);
2068 gst_buffer_unref (buffer);
2073 /* must be called with lock */
2075 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2078 GstRTSPStreamPrivate *priv = stream->priv;
2079 const GstRTSPTransport *tr;
2081 tr = gst_rtsp_stream_transport_get_transport (trans);
2083 switch (tr->lower_transport) {
2084 case GST_RTSP_LOWER_TRANS_UDP:
2085 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2091 dest = tr->destination;
2092 if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2097 min = tr->client_port.min;
2098 max = tr->client_port.max;
2103 GST_INFO ("setting ttl-mc %d", ttl);
2104 g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2105 g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2107 GST_INFO ("adding %s:%d-%d", dest, min, max);
2108 g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2109 g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2110 priv->transports = g_list_prepend (priv->transports, trans);
2112 GST_INFO ("removing %s:%d-%d", dest, min, max);
2113 g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2114 g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2115 priv->transports = g_list_remove (priv->transports, trans);
2117 priv->tr_changed = TRUE;
2120 case GST_RTSP_LOWER_TRANS_TCP:
2122 GST_INFO ("adding TCP %s", tr->destination);
2123 priv->transports = g_list_prepend (priv->transports, trans);
2125 GST_INFO ("removing TCP %s", tr->destination);
2126 priv->transports = g_list_remove (priv->transports, trans);
2128 priv->tr_changed = TRUE;
2131 goto unknown_transport;
2138 GST_INFO ("Unknown transport %d", tr->lower_transport);
2145 * gst_rtsp_stream_add_transport:
2146 * @stream: a #GstRTSPStream
2147 * @trans: (transfer none): a #GstRTSPStreamTransport
2149 * Add the transport in @trans to @stream. The media of @stream will
2150 * then also be send to the values configured in @trans.
2152 * @stream must be joined to a bin.
2154 * @trans must contain a valid #GstRTSPTransport.
2156 * Returns: %TRUE if @trans was added
2159 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2160 GstRTSPStreamTransport * trans)
2162 GstRTSPStreamPrivate *priv;
2165 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2166 priv = stream->priv;
2167 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2168 g_return_val_if_fail (priv->is_joined, FALSE);
2170 g_mutex_lock (&priv->lock);
2171 res = update_transport (stream, trans, TRUE);
2172 g_mutex_unlock (&priv->lock);
2178 * gst_rtsp_stream_remove_transport:
2179 * @stream: a #GstRTSPStream
2180 * @trans: (transfer none): a #GstRTSPStreamTransport
2182 * Remove the transport in @trans from @stream. The media of @stream will
2183 * not be sent to the values configured in @trans.
2185 * @stream must be joined to a bin.
2187 * @trans must contain a valid #GstRTSPTransport.
2189 * Returns: %TRUE if @trans was removed
2192 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2193 GstRTSPStreamTransport * trans)
2195 GstRTSPStreamPrivate *priv;
2198 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2199 priv = stream->priv;
2200 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2201 g_return_val_if_fail (priv->is_joined, FALSE);
2203 g_mutex_lock (&priv->lock);
2204 res = update_transport (stream, trans, FALSE);
2205 g_mutex_unlock (&priv->lock);
2211 * gst_rtsp_stream_get_rtp_socket:
2212 * @stream: a #GstRTSPStream
2213 * @family: the socket family
2215 * Get the RTP socket from @stream for a @family.
2217 * @stream must be joined to a bin.
2219 * Returns: (transfer full): the RTP socket or %NULL if no socket could be
2220 * allocated for @family. Unref after usage
2223 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2225 GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2229 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2230 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2231 family == G_SOCKET_FAMILY_IPV6, NULL);
2232 g_return_val_if_fail (priv->udpsink[0], NULL);
2234 if (family == G_SOCKET_FAMILY_IPV6)
2239 g_object_get (priv->udpsink[0], name, &socket, NULL);
2245 * gst_rtsp_stream_get_rtcp_socket:
2246 * @stream: a #GstRTSPStream
2247 * @family: the socket family
2249 * Get the RTCP socket from @stream for a @family.
2251 * @stream must be joined to a bin.
2253 * Returns: (transfer full): the RTCP socket or %NULL if no socket could be
2254 * allocated for @family. Unref after usage
2257 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2259 GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2263 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2264 g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2265 family == G_SOCKET_FAMILY_IPV6, NULL);
2266 g_return_val_if_fail (priv->udpsink[1], NULL);
2268 if (family == G_SOCKET_FAMILY_IPV6)
2273 g_object_get (priv->udpsink[1], name, &socket, NULL);
2279 * gst_rtsp_stream_transport_filter:
2280 * @stream: a #GstRTSPStream
2281 * @func: (scope call) (allow-none): a callback
2282 * @user_data: (closure): user data passed to @func
2284 * Call @func for each transport managed by @stream. The result value of @func
2285 * determines what happens to the transport. @func will be called with @stream
2286 * locked so no further actions on @stream can be performed from @func.
2288 * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2291 * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2293 * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2294 * will also be added with an additional ref to the result #GList of this
2297 * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2299 * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2300 * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2301 * element in the #GList should be unreffed before the list is freed.
2304 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2305 GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2307 GstRTSPStreamPrivate *priv;
2308 GList *result, *walk, *next;
2310 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2312 priv = stream->priv;
2316 g_mutex_lock (&priv->lock);
2317 for (walk = priv->transports; walk; walk = next) {
2318 GstRTSPStreamTransport *trans = walk->data;
2319 GstRTSPFilterResult res;
2321 next = g_list_next (walk);
2324 res = func (stream, trans, user_data);
2326 res = GST_RTSP_FILTER_REF;
2329 case GST_RTSP_FILTER_REMOVE:
2330 update_transport (stream, trans, FALSE);
2332 case GST_RTSP_FILTER_REF:
2333 result = g_list_prepend (result, g_object_ref (trans));
2335 case GST_RTSP_FILTER_KEEP:
2340 g_mutex_unlock (&priv->lock);
2345 static GstPadProbeReturn
2346 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2348 GstRTSPStreamPrivate *priv;
2349 GstRTSPStream *stream;
2352 priv = stream->priv;
2354 GST_DEBUG_OBJECT (pad, "now blocking");
2356 g_mutex_lock (&priv->lock);
2357 priv->blocking = TRUE;
2358 g_mutex_unlock (&priv->lock);
2360 gst_element_post_message (priv->payloader,
2361 gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2362 gst_structure_new_empty ("GstRTSPStreamBlocking")));
2364 return GST_PAD_PROBE_OK;
2368 * gst_rtsp_stream_set_blocked:
2369 * @stream: a #GstRTSPStream
2370 * @blocked: boolean indicating we should block or unblock
2372 * Blocks or unblocks the dataflow on @stream.
2374 * Returns: %TRUE on success
2377 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2379 GstRTSPStreamPrivate *priv;
2381 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2383 priv = stream->priv;
2385 g_mutex_lock (&priv->lock);
2387 priv->blocking = FALSE;
2388 if (priv->blocked_id == 0) {
2389 priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2390 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2391 GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2392 g_object_ref (stream), g_object_unref);
2395 if (priv->blocked_id != 0) {
2396 gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2397 priv->blocked_id = 0;
2398 priv->blocking = FALSE;
2401 g_mutex_unlock (&priv->lock);
2407 * gst_rtsp_stream_is_blocking:
2408 * @stream: a #GstRTSPStream
2410 * Check if @stream is blocking on a #GstBuffer.
2412 * Returns: %TRUE if @stream is blocking
2415 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2417 GstRTSPStreamPrivate *priv;
2420 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2422 priv = stream->priv;
2424 g_mutex_lock (&priv->lock);
2425 result = priv->blocking;
2426 g_mutex_unlock (&priv->lock);