2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
28 #include "rtsp-stream.h"
30 #define GST_RTSP_STREAM_GET_PRIVATE(obj) \
31 (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
33 struct _GstRTSPStreamPrivate
38 GstElement *payloader;
42 /* pads on the rtpbin */
43 GstPad *send_rtp_sink;
47 /* the RTPSession object */
50 /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
52 GstElement *udpsrc_v4[2];
54 /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
56 GstElement *udpsrc_v6[2];
58 GstElement *udpsink[2];
60 /* for TCP transport */
61 GstElement *appsrc[2];
62 GstElement *appqueue[2];
63 GstElement *appsink[2];
66 GstElement *funnel[2];
68 /* server ports for sending/receiving over ipv4 */
69 GstRTSPRange server_port_v4;
70 GstRTSPAddress *server_addr_v4;
72 /* server ports for sending/receiving over ipv6 */
73 GstRTSPRange server_port_v6;
74 GstRTSPAddress *server_addr_v6;
76 /* multicast addresses */
77 GstRTSPAddressPool *pool;
80 /* the caps of the stream */
84 /* transports we stream to */
96 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
97 #define GST_CAT_DEFAULT rtsp_stream_debug
99 static GQuark ssrc_stream_map_key;
101 static void gst_rtsp_stream_finalize (GObject * obj);
103 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
106 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
108 GObjectClass *gobject_class;
110 g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
112 gobject_class = G_OBJECT_CLASS (klass);
114 gobject_class->finalize = gst_rtsp_stream_finalize;
116 GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
118 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
122 gst_rtsp_stream_init (GstRTSPStream * stream)
124 GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
126 GST_DEBUG ("new stream %p", stream);
130 g_mutex_init (&priv->lock);
134 gst_rtsp_stream_finalize (GObject * obj)
136 GstRTSPStream *stream;
137 GstRTSPStreamPrivate *priv;
139 stream = GST_RTSP_STREAM (obj);
142 GST_DEBUG ("finalize stream %p", stream);
144 /* we really need to be unjoined now */
145 g_return_if_fail (!priv->is_joined);
148 gst_rtsp_address_free (priv->addr);
149 if (priv->server_addr_v4)
150 gst_rtsp_address_free (priv->server_addr_v4);
151 if (priv->server_addr_v6)
152 gst_rtsp_address_free (priv->server_addr_v6);
154 g_object_unref (priv->pool);
155 gst_object_unref (priv->payloader);
156 gst_object_unref (priv->srcpad);
157 g_mutex_clear (&priv->lock);
159 G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
163 * gst_rtsp_stream_new:
166 * @payloader: a #GstElement
168 * Create a new media stream with index @idx that handles RTP data on
169 * @srcpad and has a payloader element @payloader.
171 * Returns: a new #GstRTSPStream
174 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
176 GstRTSPStreamPrivate *priv;
177 GstRTSPStream *stream;
179 g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
180 g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
181 g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
183 stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
186 priv->payloader = gst_object_ref (payloader);
187 priv->srcpad = gst_object_ref (srcpad);
193 * gst_rtsp_stream_get_index:
194 * @stream: a #GstRTSPStream
196 * Get the stream index.
198 * Return: the stream index.
201 gst_rtsp_stream_get_index (GstRTSPStream * stream)
203 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
205 return stream->priv->idx;
209 * gst_rtsp_stream_get_srcpad:
210 * @stream: a #GstRTSPStream
212 * Get the srcpad associated with @stream.
214 * Return: the srcpad. Unref after usage.
217 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
219 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
221 return gst_object_ref (stream->priv->srcpad);
225 * gst_rtsp_stream_set_mtu:
226 * @stream: a #GstRTSPStream
229 * Configure the mtu in the payloader of @stream to @mtu.
232 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
234 GstRTSPStreamPrivate *priv;
236 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
240 GST_LOG_OBJECT (stream, "set MTU %u", mtu);
242 g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
246 * gst_rtsp_stream_get_mtu:
247 * @stream: a #GstRTSPStream
249 * Get the configured MTU in the payloader of @stream.
251 * Returns: the MTU of the payloader.
254 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
256 GstRTSPStreamPrivate *priv;
259 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
263 g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
269 * gst_rtsp_stream_set_address_pool:
270 * @stream: a #GstRTSPStream
271 * @pool: a #GstRTSPAddressPool
273 * configure @pool to be used as the address pool of @stream.
276 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
277 GstRTSPAddressPool * pool)
279 GstRTSPStreamPrivate *priv;
280 GstRTSPAddressPool *old;
282 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
286 GST_LOG_OBJECT (stream, "set address pool %p", pool);
288 g_mutex_lock (&priv->lock);
289 if ((old = priv->pool) != pool)
290 priv->pool = pool ? g_object_ref (pool) : NULL;
293 g_mutex_unlock (&priv->lock);
296 g_object_unref (old);
300 * gst_rtsp_stream_get_address_pool:
301 * @stream: a #GstRTSPStream
303 * Get the #GstRTSPAddressPool used as the address pool of @stream.
305 * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
309 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
311 GstRTSPStreamPrivate *priv;
312 GstRTSPAddressPool *result;
314 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
318 g_mutex_lock (&priv->lock);
319 if ((result = priv->pool))
320 g_object_ref (result);
321 g_mutex_unlock (&priv->lock);
327 * gst_rtsp_stream_get_address:
328 * @stream: a #GstRTSPStream
330 * Get the multicast address of @stream.
332 * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
333 * allocated. gst_rtsp_address_free() after usage.
336 gst_rtsp_stream_get_address (GstRTSPStream * stream)
338 GstRTSPStreamPrivate *priv;
339 GstRTSPAddress *result;
341 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
345 g_mutex_lock (&priv->lock);
346 if (priv->addr == NULL) {
347 if (priv->pool == NULL)
350 priv->addr = gst_rtsp_address_pool_acquire_address (priv->pool,
351 GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST, 2);
352 if (priv->addr == NULL)
355 result = gst_rtsp_address_copy (priv->addr);
356 g_mutex_unlock (&priv->lock);
363 GST_ERROR_OBJECT (stream, "no address pool specified");
364 g_mutex_unlock (&priv->lock);
369 GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
370 g_mutex_unlock (&priv->lock);
376 * gst_rtsp_stream_reserve_address:
377 * @stream: a #GstRTSPStream
379 * Get a specific multicast address of @stream.
381 * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
382 * allocated. gst_rtsp_address_free() after usage.
385 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
386 const gchar * address, guint port, guint n_ports, guint ttl)
388 GstRTSPStreamPrivate *priv;
389 GstRTSPAddress *result;
391 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
392 g_return_val_if_fail (address != NULL, NULL);
393 g_return_val_if_fail (port > 0, NULL);
394 g_return_val_if_fail (n_ports > 0, NULL);
395 g_return_val_if_fail (ttl > 0, NULL);
399 g_mutex_lock (&priv->lock);
400 if (priv->addr == NULL) {
401 if (priv->pool == NULL)
404 priv->addr = gst_rtsp_address_pool_reserve_address (priv->pool, address,
406 if (priv->addr == NULL)
409 if (strcmp (priv->addr->address, address) ||
410 priv->addr->port != port || priv->addr->n_ports != n_ports ||
411 priv->addr->ttl != ttl)
412 goto different_address;
414 result = gst_rtsp_address_copy (priv->addr);
415 g_mutex_unlock (&priv->lock);
422 GST_ERROR_OBJECT (stream, "no address pool specified");
423 g_mutex_unlock (&priv->lock);
428 GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
430 g_mutex_unlock (&priv->lock);
435 GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
436 " reserved", address);
437 g_mutex_unlock (&priv->lock);
443 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
444 GSocketFamily family, GstElement * udpsrc_out[2],
445 GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
446 GstRTSPAddress ** server_addr_out)
448 GstStateChangeReturn ret;
449 GstElement *udpsrc0, *udpsrc1;
450 GstElement *udpsink0, *udpsink1;
451 GSocket *rtp_socket = NULL;
452 GSocket *rtcp_socket;
453 gint tmp_rtp, tmp_rtcp;
455 gint rtpport, rtcpport;
456 GList *rejected_addresses = NULL;
457 GstRTSPAddress *addr = NULL;
458 GInetAddress *inetaddr = NULL;
459 GSocketAddress *rtp_sockaddr = NULL;
460 GSocketAddress *rtcp_sockaddr = NULL;
461 const gchar *multisink_socket = "socket";
463 if (family == G_SOCKET_FAMILY_IPV6) {
464 multisink_socket = "socket-v6";
473 /* Start with random port */
476 rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
477 G_SOCKET_PROTOCOL_UDP, NULL);
479 goto no_udp_protocol;
481 if (*server_addr_out)
482 gst_rtsp_address_free (*server_addr_out);
484 /* try to allocate 2 UDP ports, the RTP port should be an even
485 * number and the RTCP port should be the next (uneven) port */
488 if (rtp_socket == NULL) {
489 rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
490 G_SOCKET_PROTOCOL_UDP, NULL);
492 goto no_udp_protocol;
495 if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
496 GstRTSPAddressFlags flags;
499 rejected_addresses = g_list_prepend (rejected_addresses, addr);
501 flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
502 if (family == G_SOCKET_FAMILY_IPV6)
503 flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
505 flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
507 addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
512 tmp_rtp = addr->port;
514 g_clear_object (&inetaddr);
515 inetaddr = g_inet_address_new_from_string (addr->address);
523 if (inetaddr == NULL)
524 inetaddr = g_inet_address_new_any (family);
527 rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
528 if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
529 g_object_unref (rtp_sockaddr);
532 g_object_unref (rtp_sockaddr);
534 rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
535 if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
536 g_clear_object (&rtp_sockaddr);
541 g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
542 g_object_unref (rtp_sockaddr);
544 /* check if port is even */
545 if ((tmp_rtp & 1) != 0) {
546 /* port not even, close and allocate another */
548 g_clear_object (&rtp_socket);
553 tmp_rtcp = tmp_rtp + 1;
555 rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
556 if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
557 g_object_unref (rtcp_sockaddr);
558 g_clear_object (&rtp_socket);
561 g_object_unref (rtcp_sockaddr);
563 g_clear_object (&inetaddr);
565 udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
566 udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
568 if (udpsrc0 == NULL || udpsrc1 == NULL)
569 goto no_udp_protocol;
571 g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
572 g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
574 ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
575 if (ret == GST_STATE_CHANGE_FAILURE)
577 ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
578 if (ret == GST_STATE_CHANGE_FAILURE)
581 /* all fine, do port check */
582 g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
583 g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
585 /* this should not happen... */
586 if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
590 udpsink0 = udpsink_out[0];
592 udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
595 goto no_udp_protocol;
597 g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
598 g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
601 udpsink1 = udpsink_out[1];
603 udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
606 goto no_udp_protocol;
608 g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
609 g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
610 g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
612 g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
613 g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
614 g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
615 g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
616 g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
617 g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
618 g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
619 g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
621 /* we keep these elements, we will further configure them when the
622 * client told us to really use the UDP ports. */
623 udpsrc_out[0] = udpsrc0;
624 udpsrc_out[1] = udpsrc1;
625 udpsink_out[0] = udpsink0;
626 udpsink_out[1] = udpsink1;
627 server_port_out->min = rtpport;
628 server_port_out->max = rtcpport;
630 *server_addr_out = addr;
631 g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
633 g_object_unref (rtp_socket);
634 g_object_unref (rtcp_socket);
662 gst_element_set_state (udpsrc0, GST_STATE_NULL);
663 gst_object_unref (udpsrc0);
666 gst_element_set_state (udpsrc1, GST_STATE_NULL);
667 gst_object_unref (udpsrc1);
670 gst_element_set_state (udpsink0, GST_STATE_NULL);
671 gst_object_unref (udpsink0);
674 gst_element_set_state (udpsink1, GST_STATE_NULL);
675 gst_object_unref (udpsink1);
678 g_object_unref (inetaddr);
679 g_list_free_full (rejected_addresses,
680 (GDestroyNotify) gst_rtsp_address_free);
682 gst_rtsp_address_free (addr);
684 g_object_unref (rtp_socket);
686 g_object_unref (rtcp_socket);
691 /* must be called with lock */
693 alloc_ports (GstRTSPStream * stream)
695 GstRTSPStreamPrivate *priv = stream->priv;
697 return alloc_ports_one_family (priv->pool, priv->buffer_size,
698 G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
699 &priv->server_port_v4, &priv->server_addr_v4) &&
700 alloc_ports_one_family (priv->pool, priv->buffer_size,
701 G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
702 &priv->server_port_v6, &priv->server_addr_v6);
706 * gst_rtsp_stream_get_server_port:
707 * @stream: a #GstRTSPStream
708 * @server_port: (out): result server port
710 * Fill @server_port with the port pair used by the server. This function can
711 * only be called when @stream has been joined.
714 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
715 GstRTSPRange * server_port, GSocketFamily family)
717 GstRTSPStreamPrivate *priv;
719 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
721 g_return_if_fail (priv->is_joined);
723 g_mutex_lock (&priv->lock);
724 if (family == G_SOCKET_FAMILY_IPV4) {
726 *server_port = priv->server_port_v4;
729 *server_port = priv->server_port_v6;
731 g_mutex_unlock (&priv->lock);
735 * gst_rtsp_stream_get_ssrc:
736 * @stream: a #GstRTSPStream
737 * @ssrc: (out): result ssrc
739 * Get the SSRC used by the RTP session of this stream. This function can only
740 * be called when @stream has been joined.
743 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
745 GstRTSPStreamPrivate *priv;
747 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
749 g_return_if_fail (priv->is_joined);
751 g_mutex_lock (&priv->lock);
752 if (ssrc && priv->session)
753 g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
754 g_mutex_unlock (&priv->lock);
757 /* executed from streaming thread */
759 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
761 GstRTSPStreamPrivate *priv = stream->priv;
762 GstCaps *newcaps, *oldcaps;
764 newcaps = gst_pad_get_current_caps (pad);
766 GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
769 g_mutex_lock (&priv->lock);
770 oldcaps = priv->caps;
771 priv->caps = newcaps;
772 g_mutex_unlock (&priv->lock);
775 gst_caps_unref (oldcaps);
779 dump_structure (const GstStructure * s)
783 sstr = gst_structure_to_string (s);
784 GST_INFO ("structure: %s", sstr);
788 static GstRTSPStreamTransport *
789 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
791 GstRTSPStreamPrivate *priv = stream->priv;
793 GstRTSPStreamTransport *result = NULL;
798 if (rtcp_from == NULL)
801 tmp = g_strrstr (rtcp_from, ":");
805 port = atoi (tmp + 1);
806 dest = g_strndup (rtcp_from, tmp - rtcp_from);
808 g_mutex_lock (&priv->lock);
809 GST_INFO ("finding %s:%d in %d transports", dest, port,
810 g_list_length (priv->transports));
812 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
813 GstRTSPStreamTransport *trans = walk->data;
814 const GstRTSPTransport *tr;
817 tr = gst_rtsp_stream_transport_get_transport (trans);
819 min = tr->client_port.min;
820 max = tr->client_port.max;
822 if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
828 g_object_ref (result);
829 g_mutex_unlock (&priv->lock);
836 static GstRTSPStreamTransport *
837 check_transport (GObject * source, GstRTSPStream * stream)
840 GstRTSPStreamTransport *trans;
842 /* see if we have a stream to match with the origin of the RTCP packet */
843 trans = g_object_get_qdata (source, ssrc_stream_map_key);
845 g_object_get (source, "stats", &stats, NULL);
847 const gchar *rtcp_from;
849 dump_structure (stats);
851 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
852 if ((trans = find_transport (stream, rtcp_from))) {
853 GST_INFO ("%p: found transport %p for source %p", stream, trans,
855 g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
858 gst_structure_free (stats);
866 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
868 GstRTSPStreamTransport *trans;
870 GST_INFO ("%p: new source %p", stream, source);
872 trans = check_transport (source, stream);
875 GST_INFO ("%p: source %p for transport %p", stream, source, trans);
879 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
881 GST_INFO ("%p: new SDES %p", stream, source);
885 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
887 GstRTSPStreamTransport *trans;
889 trans = check_transport (source, stream);
892 GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
893 gst_rtsp_stream_transport_keep_alive (trans);
898 g_object_get (source, "stats", &stats, NULL);
900 dump_structure (stats);
901 gst_structure_free (stats);
908 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
910 GST_INFO ("%p: source %p bye", stream, source);
914 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
916 GstRTSPStreamTransport *trans;
918 GST_INFO ("%p: source %p bye timeout", stream, source);
920 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
921 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
922 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
927 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
929 GstRTSPStreamTransport *trans;
931 GST_INFO ("%p: source %p timeout", stream, source);
933 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
934 gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
935 g_object_set_qdata (source, ssrc_stream_map_key, NULL);
940 handle_new_sample (GstAppSink * sink, gpointer user_data)
942 GstRTSPStreamPrivate *priv;
946 GstRTSPStream *stream;
948 sample = gst_app_sink_pull_sample (sink);
952 stream = (GstRTSPStream *) user_data;
954 buffer = gst_sample_get_buffer (sample);
956 g_mutex_lock (&priv->lock);
957 for (walk = priv->transports; walk; walk = g_list_next (walk)) {
958 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
960 if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
961 gst_rtsp_stream_transport_send_rtp (tr, buffer);
963 gst_rtsp_stream_transport_send_rtcp (tr, buffer);
966 g_mutex_unlock (&priv->lock);
968 gst_sample_unref (sample);
973 static GstAppSinkCallbacks sink_cb = {
974 NULL, /* not interested in EOS */
975 NULL, /* not interested in preroll samples */
980 * gst_rtsp_stream_join_bin:
981 * @stream: a #GstRTSPStream
982 * @bin: a #GstBin to join
983 * @rtpbin: a rtpbin element in @bin
984 * @state: the target state of the new elements
986 * Join the #Gstbin @bin that contains the element @rtpbin.
988 * @stream will link to @rtpbin, which must be inside @bin. The elements
989 * added to @bin will be set to the state given in @state.
991 * Returns: %TRUE on success.
994 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
995 GstElement * rtpbin, GstState state)
997 GstRTSPStreamPrivate *priv;
1000 GstPad *pad, *teepad, *queuepad, *selpad;
1001 GstPadLinkReturn ret;
1003 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1004 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1005 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1007 priv = stream->priv;
1009 g_mutex_lock (&priv->lock);
1010 if (priv->is_joined)
1013 /* create a session with the same index as the stream */
1016 GST_INFO ("stream %p joining bin as session %d", stream, idx);
1018 if (!alloc_ports (stream))
1021 /* get a pad for sending RTP */
1022 name = g_strdup_printf ("send_rtp_sink_%u", idx);
1023 priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1025 /* link the RTP pad to the session manager, it should not really fail unless
1026 * this is not really an RTP pad */
1027 ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1028 if (ret != GST_PAD_LINK_OK)
1031 /* get pads from the RTP session element for sending and receiving
1033 name = g_strdup_printf ("send_rtp_src_%u", idx);
1034 priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1036 name = g_strdup_printf ("send_rtcp_src_%u", idx);
1037 priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1039 name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1040 priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1042 name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1043 priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1046 /* get the session */
1047 g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1049 g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1051 g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1053 g_signal_connect (priv->session, "on-ssrc-active",
1054 (GCallback) on_ssrc_active, stream);
1055 g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1057 g_signal_connect (priv->session, "on-bye-timeout",
1058 (GCallback) on_bye_timeout, stream);
1059 g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1062 for (i = 0; i < 2; i++) {
1063 /* For the sender we create this bit of pipeline for both
1064 * RTP and RTCP. Sync and preroll are enabled on udpsink so
1065 * we need to add a queue before appsink to make the pipeline
1066 * not block. For the TCP case, we want to pump data to the
1067 * client as fast as possible anyway.
1069 * .--------. .-----. .---------.
1070 * | rtpbin | | tee | | udpsink |
1071 * | send->sink src->sink |
1072 * '--------' | | '---------'
1073 * | | .---------. .---------.
1074 * | | | queue | | appsink |
1075 * | src->sink src->sink |
1076 * '-----' '---------' '---------'
1078 /* make tee for RTP/RTCP */
1079 priv->tee[i] = gst_element_factory_make ("tee", NULL);
1080 gst_bin_add (bin, priv->tee[i]);
1082 /* and link to rtpbin send pad */
1083 pad = gst_element_get_static_pad (priv->tee[i], "sink");
1084 gst_pad_link (priv->send_src[i], pad);
1085 gst_object_unref (pad);
1088 gst_bin_add (bin, priv->udpsink[i]);
1090 /* link tee to udpsink */
1091 teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1092 pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1093 gst_pad_link (teepad, pad);
1094 gst_object_unref (pad);
1095 gst_object_unref (teepad);
1098 priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1099 gst_bin_add (bin, priv->appqueue[i]);
1100 /* and link to tee */
1101 teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1102 pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1103 gst_pad_link (teepad, pad);
1104 gst_object_unref (pad);
1105 gst_object_unref (teepad);
1108 priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1109 g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1110 g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1111 gst_bin_add (bin, priv->appsink[i]);
1112 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1113 &sink_cb, stream, NULL);
1114 /* and link to queue */
1115 queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1116 pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1117 gst_pad_link (queuepad, pad);
1118 gst_object_unref (pad);
1119 gst_object_unref (queuepad);
1121 /* For the receiver we create this bit of pipeline for both
1122 * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1123 * and it is all funneled into the rtpbin receive pad.
1125 * .--------. .--------. .--------.
1126 * | udpsrc | | funnel | | rtpbin |
1127 * | src->sink src->sink |
1128 * '--------' | | '--------'
1132 * '--------' '--------'
1134 /* make funnel for the RTP/RTCP receivers */
1135 priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1136 gst_bin_add (bin, priv->funnel[i]);
1138 pad = gst_element_get_static_pad (priv->funnel[i], "src");
1139 gst_pad_link (pad, priv->recv_sink[i]);
1140 gst_object_unref (pad);
1142 /* we set and keep these to playing so that they don't cause NO_PREROLL return
1144 gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1145 gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1146 gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1147 gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1149 gst_bin_add (bin, priv->udpsrc_v4[i]);
1150 gst_bin_add (bin, priv->udpsrc_v6[i]);
1151 /* and link to the funnel v4 */
1152 selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1153 pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1154 gst_pad_link (pad, selpad);
1155 gst_object_unref (pad);
1156 gst_object_unref (selpad);
1158 /* and link to the funnel v6 */
1159 selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1160 pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1161 gst_pad_link (pad, selpad);
1162 gst_object_unref (pad);
1163 gst_object_unref (selpad);
1165 /* make and add appsrc */
1166 priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1167 gst_bin_add (bin, priv->appsrc[i]);
1168 /* and link to the funnel */
1169 selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1170 pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1171 gst_pad_link (pad, selpad);
1172 gst_object_unref (pad);
1173 gst_object_unref (selpad);
1175 /* check if we need to set to a special state */
1176 if (state != GST_STATE_NULL) {
1177 gst_element_set_state (priv->udpsink[i], state);
1178 gst_element_set_state (priv->appsink[i], state);
1179 gst_element_set_state (priv->appqueue[i], state);
1180 gst_element_set_state (priv->tee[i], state);
1181 gst_element_set_state (priv->funnel[i], state);
1182 gst_element_set_state (priv->appsrc[i], state);
1186 /* be notified of caps changes */
1187 priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1188 (GCallback) caps_notify, stream);
1190 priv->is_joined = TRUE;
1191 g_mutex_unlock (&priv->lock);
1198 g_mutex_unlock (&priv->lock);
1203 g_mutex_unlock (&priv->lock);
1204 GST_WARNING ("failed to allocate ports %d", idx);
1209 GST_WARNING ("failed to link stream %d", idx);
1210 gst_object_unref (priv->send_rtp_sink);
1211 priv->send_rtp_sink = NULL;
1212 g_mutex_unlock (&priv->lock);
1218 * gst_rtsp_stream_leave_bin:
1219 * @stream: a #GstRTSPStream
1221 * @rtpbin: a rtpbin #GstElement
1223 * Remove the elements of @stream from @bin.
1225 * Return: %TRUE on success.
1228 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1229 GstElement * rtpbin)
1231 GstRTSPStreamPrivate *priv;
1234 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1235 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1236 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1238 priv = stream->priv;
1240 g_mutex_lock (&priv->lock);
1241 if (!priv->is_joined)
1242 goto was_not_joined;
1244 /* all transports must be removed by now */
1245 g_return_val_if_fail (priv->transports == NULL, FALSE);
1247 GST_INFO ("stream %p leaving bin", stream);
1249 gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1250 g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1251 gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1252 gst_object_unref (priv->send_rtp_sink);
1253 priv->send_rtp_sink = NULL;
1255 for (i = 0; i < 2; i++) {
1256 gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1257 gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1258 gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1259 gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1260 gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1261 gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1262 /* and set udpsrc to NULL now before removing */
1263 gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1264 gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1265 gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1266 gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1268 /* removing them should also nicely release the request
1269 * pads when they finalize */
1270 gst_bin_remove (bin, priv->udpsrc_v4[i]);
1271 gst_bin_remove (bin, priv->udpsrc_v6[i]);
1272 gst_bin_remove (bin, priv->udpsink[i]);
1273 gst_bin_remove (bin, priv->appsrc[i]);
1274 gst_bin_remove (bin, priv->appsink[i]);
1275 gst_bin_remove (bin, priv->appqueue[i]);
1276 gst_bin_remove (bin, priv->tee[i]);
1277 gst_bin_remove (bin, priv->funnel[i]);
1279 gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1280 gst_object_unref (priv->recv_sink[i]);
1281 priv->recv_sink[i] = NULL;
1283 priv->udpsrc_v4[i] = NULL;
1284 priv->udpsrc_v6[i] = NULL;
1285 priv->udpsink[i] = NULL;
1286 priv->appsrc[i] = NULL;
1287 priv->appsink[i] = NULL;
1288 priv->appqueue[i] = NULL;
1289 priv->tee[i] = NULL;
1290 priv->funnel[i] = NULL;
1292 gst_object_unref (priv->send_src[0]);
1293 priv->send_src[0] = NULL;
1295 gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1296 gst_object_unref (priv->send_src[1]);
1297 priv->send_src[1] = NULL;
1299 g_object_unref (priv->session);
1300 priv->session = NULL;
1302 gst_caps_unref (priv->caps);
1305 priv->is_joined = FALSE;
1306 g_mutex_unlock (&priv->lock);
1317 * gst_rtsp_stream_get_rtpinfo:
1318 * @stream: a #GstRTSPStream
1319 * @rtptime: result RTP timestamp
1320 * @seq: result RTP seqnum
1322 * Retrieve the current rtptime and seq. This is used to
1323 * construct a RTPInfo reply header.
1325 * Returns: %TRUE when rtptime and seq could be determined.
1328 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1329 guint * rtptime, guint * seq)
1331 GstRTSPStreamPrivate *priv;
1332 GObjectClass *payobjclass;
1334 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1335 g_return_val_if_fail (rtptime != NULL, FALSE);
1336 g_return_val_if_fail (seq != NULL, FALSE);
1338 priv = stream->priv;
1340 payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1342 if (!g_object_class_find_property (payobjclass, "seqnum") ||
1343 !g_object_class_find_property (payobjclass, "timestamp"))
1346 g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1352 * gst_rtsp_stream_get_caps:
1353 * @stream: a #GstRTSPStream
1355 * Retrieve the current caps of @stream.
1357 * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1361 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1363 GstRTSPStreamPrivate *priv;
1366 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1368 priv = stream->priv;
1370 g_mutex_lock (&priv->lock);
1371 if ((result = priv->caps))
1372 gst_caps_ref (result);
1373 g_mutex_unlock (&priv->lock);
1379 * gst_rtsp_stream_recv_rtp:
1380 * @stream: a #GstRTSPStream
1381 * @buffer: (transfer full): a #GstBuffer
1383 * Handle an RTP buffer for the stream. This method is usually called when a
1384 * message has been received from a client using the TCP transport.
1386 * This function takes ownership of @buffer.
1388 * Returns: a GstFlowReturn.
1391 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1393 GstRTSPStreamPrivate *priv;
1395 GstElement *element;
1397 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1398 priv = stream->priv;
1399 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1400 g_return_val_if_fail (priv->is_joined, FALSE);
1402 g_mutex_lock (&priv->lock);
1403 element = gst_object_ref (priv->appsrc[0]);
1404 g_mutex_unlock (&priv->lock);
1406 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1408 gst_object_unref (element);
1414 * gst_rtsp_stream_recv_rtcp:
1415 * @stream: a #GstRTSPStream
1416 * @buffer: (transfer full): a #GstBuffer
1418 * Handle an RTCP buffer for the stream. This method is usually called when a
1419 * message has been received from a client using the TCP transport.
1421 * This function takes ownership of @buffer.
1423 * Returns: a GstFlowReturn.
1426 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1428 GstRTSPStreamPrivate *priv;
1430 GstElement *element;
1432 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1433 priv = stream->priv;
1434 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1435 g_return_val_if_fail (priv->is_joined, FALSE);
1437 g_mutex_lock (&priv->lock);
1438 element = gst_object_ref (priv->appsrc[1]);
1439 g_mutex_unlock (&priv->lock);
1441 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1443 gst_object_unref (element);
1448 /* must be called with lock */
1450 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1453 GstRTSPStreamPrivate *priv = stream->priv;
1454 const GstRTSPTransport *tr;
1456 tr = gst_rtsp_stream_transport_get_transport (trans);
1458 switch (tr->lower_transport) {
1459 case GST_RTSP_LOWER_TRANS_UDP:
1460 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1466 dest = tr->destination;
1467 if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1472 min = tr->client_port.min;
1473 max = tr->client_port.max;
1477 GST_INFO ("adding %s:%d-%d", dest, min, max);
1478 g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1479 g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1481 GST_INFO ("setting ttl-mc %d", ttl);
1482 g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1483 g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1485 priv->transports = g_list_prepend (priv->transports, trans);
1487 GST_INFO ("removing %s:%d-%d", dest, min, max);
1488 g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1489 g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1490 priv->transports = g_list_remove (priv->transports, trans);
1494 case GST_RTSP_LOWER_TRANS_TCP:
1496 GST_INFO ("adding TCP %s", tr->destination);
1497 priv->transports = g_list_prepend (priv->transports, trans);
1499 GST_INFO ("removing TCP %s", tr->destination);
1500 priv->transports = g_list_remove (priv->transports, trans);
1504 goto unknown_transport;
1511 GST_INFO ("Unknown transport %d", tr->lower_transport);
1518 * gst_rtsp_stream_add_transport:
1519 * @stream: a #GstRTSPStream
1520 * @trans: a #GstRTSPStreamTransport
1522 * Add the transport in @trans to @stream. The media of @stream will
1523 * then also be send to the values configured in @trans.
1525 * @stream must be joined to a bin.
1527 * @trans must contain a valid #GstRTSPTransport.
1529 * Returns: %TRUE if @trans was added
1532 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1533 GstRTSPStreamTransport * trans)
1535 GstRTSPStreamPrivate *priv;
1538 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1539 priv = stream->priv;
1540 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1541 g_return_val_if_fail (priv->is_joined, FALSE);
1543 g_mutex_lock (&priv->lock);
1544 res = update_transport (stream, trans, TRUE);
1545 g_mutex_unlock (&priv->lock);
1551 * gst_rtsp_stream_remove_transport:
1552 * @stream: a #GstRTSPStream
1553 * @trans: a #GstRTSPStreamTransport
1555 * Remove the transport in @trans from @stream. The media of @stream will
1556 * not be sent to the values configured in @trans.
1558 * @stream must be joined to a bin.
1560 * @trans must contain a valid #GstRTSPTransport.
1562 * Returns: %TRUE if @trans was removed
1565 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1566 GstRTSPStreamTransport * trans)
1568 GstRTSPStreamPrivate *priv;
1571 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1572 priv = stream->priv;
1573 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1574 g_return_val_if_fail (priv->is_joined, FALSE);
1576 g_mutex_lock (&priv->lock);
1577 res = update_transport (stream, trans, FALSE);
1578 g_mutex_unlock (&priv->lock);