stream: add more support for IPv6
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
31      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
32
33 struct _GstRTSPStreamPrivate
34 {
35   GMutex lock;
36   guint idx;
37   GstPad *srcpad;
38   GstElement *payloader;
39   guint buffer_size;
40   gboolean is_joined;
41
42   /* pads on the rtpbin */
43   GstPad *send_rtp_sink;
44   GstPad *recv_sink[2];
45   GstPad *send_src[2];
46
47   /* the RTPSession object */
48   GObject *session;
49
50   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
51    * sockets */
52   GstElement *udpsrc_v4[2];
53
54   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
55    * sockets */
56   GstElement *udpsrc_v6[2];
57
58   GstElement *udpsink[2];
59
60   /* for TCP transport */
61   GstElement *appsrc[2];
62   GstElement *appqueue[2];
63   GstElement *appsink[2];
64
65   GstElement *tee[2];
66   GstElement *funnel[2];
67
68   /* server ports for sending/receiving over ipv4 */
69   GstRTSPRange server_port_v4;
70   GstRTSPAddress *server_addr_v4;
71   gboolean have_ipv4;
72
73   /* server ports for sending/receiving over ipv6 */
74   GstRTSPRange server_port_v6;
75   GstRTSPAddress *server_addr_v6;
76   gboolean have_ipv6;
77
78   /* multicast addresses */
79   GstRTSPAddressPool *pool;
80   GstRTSPAddress *addr_v4;
81   GstRTSPAddress *addr_v6;
82
83   /* the caps of the stream */
84   gulong caps_sig;
85   GstCaps *caps;
86
87   /* transports we stream to */
88   guint n_active;
89   GList *transports;
90
91   gint dscp_qos;
92 };
93
94
95 enum
96 {
97   PROP_0,
98   PROP_LAST
99 };
100
101 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
102 #define GST_CAT_DEFAULT rtsp_stream_debug
103
104 static GQuark ssrc_stream_map_key;
105
106 static void gst_rtsp_stream_finalize (GObject * obj);
107
108 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
109
110 static void
111 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
112 {
113   GObjectClass *gobject_class;
114
115   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
116
117   gobject_class = G_OBJECT_CLASS (klass);
118
119   gobject_class->finalize = gst_rtsp_stream_finalize;
120
121   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
122
123   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
124 }
125
126 static void
127 gst_rtsp_stream_init (GstRTSPStream * stream)
128 {
129   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
130
131   GST_DEBUG ("new stream %p", stream);
132
133   stream->priv = priv;
134
135   stream->priv->dscp_qos = -1;
136
137   g_mutex_init (&priv->lock);
138 }
139
140 static void
141 gst_rtsp_stream_finalize (GObject * obj)
142 {
143   GstRTSPStream *stream;
144   GstRTSPStreamPrivate *priv;
145
146   stream = GST_RTSP_STREAM (obj);
147   priv = stream->priv;
148
149   GST_DEBUG ("finalize stream %p", stream);
150
151   /* we really need to be unjoined now */
152   g_return_if_fail (!priv->is_joined);
153
154   if (priv->addr_v4)
155     gst_rtsp_address_free (priv->addr_v4);
156   if (priv->addr_v6)
157     gst_rtsp_address_free (priv->addr_v6);
158   if (priv->server_addr_v4)
159     gst_rtsp_address_free (priv->server_addr_v4);
160   if (priv->server_addr_v6)
161     gst_rtsp_address_free (priv->server_addr_v6);
162   if (priv->pool)
163     g_object_unref (priv->pool);
164   gst_object_unref (priv->payloader);
165   gst_object_unref (priv->srcpad);
166   g_mutex_clear (&priv->lock);
167
168   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
169 }
170
171 /**
172  * gst_rtsp_stream_new:
173  * @idx: an index
174  * @srcpad: a #GstPad
175  * @payloader: a #GstElement
176  *
177  * Create a new media stream with index @idx that handles RTP data on
178  * @srcpad and has a payloader element @payloader.
179  *
180  * Returns: a new #GstRTSPStream
181  */
182 GstRTSPStream *
183 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
184 {
185   GstRTSPStreamPrivate *priv;
186   GstRTSPStream *stream;
187
188   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
189   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
190   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
191
192   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
193   priv = stream->priv;
194   priv->idx = idx;
195   priv->payloader = gst_object_ref (payloader);
196   priv->srcpad = gst_object_ref (srcpad);
197
198   return stream;
199 }
200
201 /**
202  * gst_rtsp_stream_get_index:
203  * @stream: a #GstRTSPStream
204  *
205  * Get the stream index.
206  *
207  * Return: the stream index.
208  */
209 guint
210 gst_rtsp_stream_get_index (GstRTSPStream * stream)
211 {
212   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
213
214   return stream->priv->idx;
215 }
216
217  /**
218  * gst_rtsp_stream_get_srcpad:
219  * @stream: a #GstRTSPStream
220  *
221  * Get the srcpad associated with @stream.
222  *
223  * Return: the srcpad. Unref after usage.
224  */
225 GstPad *
226 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
227 {
228   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
229
230   return gst_object_ref (stream->priv->srcpad);
231 }
232
233 /**
234  * gst_rtsp_stream_set_mtu:
235  * @stream: a #GstRTSPStream
236  * @mtu: a new MTU
237  *
238  * Configure the mtu in the payloader of @stream to @mtu.
239  */
240 void
241 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
242 {
243   GstRTSPStreamPrivate *priv;
244
245   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
246
247   priv = stream->priv;
248
249   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
250
251   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
252 }
253
254 /**
255  * gst_rtsp_stream_get_mtu:
256  * @stream: a #GstRTSPStream
257  *
258  * Get the configured MTU in the payloader of @stream.
259  *
260  * Returns: the MTU of the payloader.
261  */
262 guint
263 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
264 {
265   GstRTSPStreamPrivate *priv;
266   guint mtu;
267
268   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
269
270   priv = stream->priv;
271
272   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
273
274   return mtu;
275 }
276
277 /* Update the dscp qos property on the udp sinks */
278 static void
279 update_dscp_qos (GstRTSPStream * stream)
280 {
281   GstRTSPStreamPrivate *priv;
282
283   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
284
285   priv = stream->priv;
286
287   if (priv->udpsink[0]) {
288     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
289         NULL);
290   }
291
292   if (priv->udpsink[1]) {
293     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
294         NULL);
295   }
296 }
297
298 /**
299  * gst_rtsp_stream_set_dscp_qos:
300  * @stream: a #GstRTSPStream
301  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
302  *
303  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
304  */
305 void
306 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
307 {
308   GstRTSPStreamPrivate *priv;
309
310   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
311
312   priv = stream->priv;
313
314   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
315
316   if (dscp_qos < -1 || dscp_qos > 63) {
317     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
318     return;
319   }
320
321   priv->dscp_qos = dscp_qos;
322
323   update_dscp_qos (stream);
324 }
325
326 /**
327  * gst_rtsp_stream_get_dscp_qos:
328  * @stream: a #GstRTSPStream
329  *
330  * Get the configured DSCP QoS in of the outgoing sockets.
331  *
332  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
333  */
334 gint
335 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
336 {
337   GstRTSPStreamPrivate *priv;
338
339   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
340
341   priv = stream->priv;
342
343   return priv->dscp_qos;
344 }
345
346
347 /**
348  * gst_rtsp_stream_set_address_pool:
349  * @stream: a #GstRTSPStream
350  * @pool: a #GstRTSPAddressPool
351  *
352  * configure @pool to be used as the address pool of @stream.
353  */
354 void
355 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
356     GstRTSPAddressPool * pool)
357 {
358   GstRTSPStreamPrivate *priv;
359   GstRTSPAddressPool *old;
360
361   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
362
363   priv = stream->priv;
364
365   GST_LOG_OBJECT (stream, "set address pool %p", pool);
366
367   g_mutex_lock (&priv->lock);
368   if ((old = priv->pool) != pool)
369     priv->pool = pool ? g_object_ref (pool) : NULL;
370   else
371     old = NULL;
372   g_mutex_unlock (&priv->lock);
373
374   if (old)
375     g_object_unref (old);
376 }
377
378 /**
379  * gst_rtsp_stream_get_address_pool:
380  * @stream: a #GstRTSPStream
381  *
382  * Get the #GstRTSPAddressPool used as the address pool of @stream.
383  *
384  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
385  * usage.
386  */
387 GstRTSPAddressPool *
388 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
389 {
390   GstRTSPStreamPrivate *priv;
391   GstRTSPAddressPool *result;
392
393   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
394
395   priv = stream->priv;
396
397   g_mutex_lock (&priv->lock);
398   if ((result = priv->pool))
399     g_object_ref (result);
400   g_mutex_unlock (&priv->lock);
401
402   return result;
403 }
404
405 /**
406  * gst_rtsp_stream_get_multicast_address:
407  * @stream: a #GstRTSPStream
408  * @family: the #GSocketFamily
409  *
410  * Get the multicast address of @stream for @family.
411  *
412  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
413  * allocated. gst_rtsp_address_free() after usage.
414  */
415 GstRTSPAddress *
416 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
417     GSocketFamily family)
418 {
419   GstRTSPStreamPrivate *priv;
420   GstRTSPAddress *result;
421   GstRTSPAddress **addrp;
422   GstRTSPAddressFlags flags;
423
424   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
425
426   priv = stream->priv;
427
428   if (family == G_SOCKET_FAMILY_IPV6) {
429     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
430     addrp = &priv->addr_v4;
431   } else {
432     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
433     addrp = &priv->addr_v6;
434   }
435
436   g_mutex_lock (&priv->lock);
437   if (*addrp == NULL) {
438     if (priv->pool == NULL)
439       goto no_pool;
440
441     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
442
443     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
444     if (*addrp == NULL)
445       goto no_address;
446   }
447   result = gst_rtsp_address_copy (*addrp);
448   g_mutex_unlock (&priv->lock);
449
450   return result;
451
452   /* ERRORS */
453 no_pool:
454   {
455     GST_ERROR_OBJECT (stream, "no address pool specified");
456     g_mutex_unlock (&priv->lock);
457     return NULL;
458   }
459 no_address:
460   {
461     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
462     g_mutex_unlock (&priv->lock);
463     return NULL;
464   }
465 }
466
467 /**
468  * gst_rtsp_stream_reserve_address:
469  * @stream: a #GstRTSPStream
470  * @address: an address
471  * @port: a port
472  * @n_ports: n_ports
473  * @ttl: a TTL
474  *
475  * Reserve @address and @port as the address and port of @stream.
476  *
477  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
478  * reserved. gst_rtsp_address_free() after usage.
479  */
480 GstRTSPAddress *
481 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
482     const gchar * address, guint port, guint n_ports, guint ttl)
483 {
484   GstRTSPStreamPrivate *priv;
485   GstRTSPAddress *result;
486   GInetAddress *addr;
487   GSocketFamily family;
488   GstRTSPAddress **addrp;
489
490   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
491   g_return_val_if_fail (address != NULL, NULL);
492   g_return_val_if_fail (port > 0, NULL);
493   g_return_val_if_fail (n_ports > 0, NULL);
494   g_return_val_if_fail (ttl > 0, NULL);
495
496   priv = stream->priv;
497
498   addr = g_inet_address_new_from_string (address);
499   if (!addr) {
500     GST_ERROR ("failed to get inet addr from %s", address);
501     family = G_SOCKET_FAMILY_IPV4;
502   } else {
503     family = g_inet_address_get_family (addr);
504     g_object_unref (addr);
505   }
506
507   if (family == G_SOCKET_FAMILY_IPV6)
508     addrp = &priv->addr_v4;
509   else
510     addrp = &priv->addr_v6;
511
512   g_mutex_lock (&priv->lock);
513   if (*addrp == NULL) {
514     if (priv->pool == NULL)
515       goto no_pool;
516
517     *addrp = gst_rtsp_address_pool_reserve_address (priv->pool, address,
518         port, n_ports, ttl);
519     if (*addrp == NULL)
520       goto no_address;
521   } else {
522     if (strcmp ((*addrp)->address, address) ||
523         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
524         (*addrp)->ttl != ttl)
525       goto different_address;
526   }
527   result = gst_rtsp_address_copy (*addrp);
528   g_mutex_unlock (&priv->lock);
529
530   return result;
531
532   /* ERRORS */
533 no_pool:
534   {
535     GST_ERROR_OBJECT (stream, "no address pool specified");
536     g_mutex_unlock (&priv->lock);
537     return NULL;
538   }
539 no_address:
540   {
541     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
542         address);
543     g_mutex_unlock (&priv->lock);
544     return NULL;
545   }
546 different_address:
547   {
548     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
549         " reserved", address);
550     g_mutex_unlock (&priv->lock);
551     return NULL;
552   }
553 }
554
555 static gboolean
556 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
557     GSocketFamily family, GstElement * udpsrc_out[2],
558     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
559     GstRTSPAddress ** server_addr_out)
560 {
561   GstStateChangeReturn ret;
562   GstElement *udpsrc0, *udpsrc1;
563   GstElement *udpsink0, *udpsink1;
564   GSocket *rtp_socket = NULL;
565   GSocket *rtcp_socket;
566   gint tmp_rtp, tmp_rtcp;
567   guint count;
568   gint rtpport, rtcpport;
569   GList *rejected_addresses = NULL;
570   GstRTSPAddress *addr = NULL;
571   GInetAddress *inetaddr = NULL;
572   GSocketAddress *rtp_sockaddr = NULL;
573   GSocketAddress *rtcp_sockaddr = NULL;
574   const gchar *multisink_socket;
575
576   if (family == G_SOCKET_FAMILY_IPV6)
577     multisink_socket = "socket-v6";
578   else
579     multisink_socket = "socket";
580
581   udpsrc0 = NULL;
582   udpsrc1 = NULL;
583   udpsink0 = NULL;
584   udpsink1 = NULL;
585   count = 0;
586
587   /* Start with random port */
588   tmp_rtp = 0;
589
590   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
591       G_SOCKET_PROTOCOL_UDP, NULL);
592   if (!rtcp_socket)
593     goto no_udp_protocol;
594
595   if (*server_addr_out)
596     gst_rtsp_address_free (*server_addr_out);
597
598   /* try to allocate 2 UDP ports, the RTP port should be an even
599    * number and the RTCP port should be the next (uneven) port */
600 again:
601
602   if (rtp_socket == NULL) {
603     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
604         G_SOCKET_PROTOCOL_UDP, NULL);
605     if (!rtp_socket)
606       goto no_udp_protocol;
607   }
608
609   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
610     GstRTSPAddressFlags flags;
611
612     if (addr)
613       rejected_addresses = g_list_prepend (rejected_addresses, addr);
614
615     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
616     if (family == G_SOCKET_FAMILY_IPV6)
617       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
618     else
619       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
620
621     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
622
623     if (addr == NULL)
624       goto no_ports;
625
626     tmp_rtp = addr->port;
627
628     g_clear_object (&inetaddr);
629     inetaddr = g_inet_address_new_from_string (addr->address);
630   } else {
631     if (tmp_rtp != 0) {
632       tmp_rtp += 2;
633       if (++count > 20)
634         goto no_ports;
635     }
636
637     if (inetaddr == NULL)
638       inetaddr = g_inet_address_new_any (family);
639   }
640
641   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
642   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
643     g_object_unref (rtp_sockaddr);
644     goto again;
645   }
646   g_object_unref (rtp_sockaddr);
647
648   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
649   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
650     g_clear_object (&rtp_sockaddr);
651     goto socket_error;
652   }
653
654   tmp_rtp =
655       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
656   g_object_unref (rtp_sockaddr);
657
658   /* check if port is even */
659   if ((tmp_rtp & 1) != 0) {
660     /* port not even, close and allocate another */
661     tmp_rtp++;
662     g_clear_object (&rtp_socket);
663     goto again;
664   }
665
666   /* set port */
667   tmp_rtcp = tmp_rtp + 1;
668
669   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
670   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
671     g_object_unref (rtcp_sockaddr);
672     g_clear_object (&rtp_socket);
673     goto again;
674   }
675   g_object_unref (rtcp_sockaddr);
676
677   g_clear_object (&inetaddr);
678
679   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
680   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
681
682   if (udpsrc0 == NULL || udpsrc1 == NULL)
683     goto no_udp_protocol;
684
685   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
686   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
687
688   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
689   if (ret == GST_STATE_CHANGE_FAILURE)
690     goto element_error;
691   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
692   if (ret == GST_STATE_CHANGE_FAILURE)
693     goto element_error;
694
695   /* all fine, do port check */
696   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
697   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
698
699   /* this should not happen... */
700   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
701     goto port_error;
702
703   if (udpsink_out[0])
704     udpsink0 = udpsink_out[0];
705   else
706     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
707
708   if (!udpsink0)
709     goto no_udp_protocol;
710
711   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
712   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
713
714   if (udpsink_out[1])
715     udpsink1 = udpsink_out[1];
716   else
717     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
718
719   if (!udpsink1)
720     goto no_udp_protocol;
721
722   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
723   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
724   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
725
726   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
727   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
728   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
729   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
730   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
731   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
732   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
733   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
734
735   /* we keep these elements, we will further configure them when the
736    * client told us to really use the UDP ports. */
737   udpsrc_out[0] = udpsrc0;
738   udpsrc_out[1] = udpsrc1;
739   udpsink_out[0] = udpsink0;
740   udpsink_out[1] = udpsink1;
741   server_port_out->min = rtpport;
742   server_port_out->max = rtcpport;
743
744   *server_addr_out = addr;
745   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
746
747   g_object_unref (rtp_socket);
748   g_object_unref (rtcp_socket);
749
750   return TRUE;
751
752   /* ERRORS */
753 no_udp_protocol:
754   {
755     goto cleanup;
756   }
757 no_ports:
758   {
759     goto cleanup;
760   }
761 port_error:
762   {
763     goto cleanup;
764   }
765 socket_error:
766   {
767     goto cleanup;
768   }
769 element_error:
770   {
771     goto cleanup;
772   }
773 cleanup:
774   {
775     if (udpsrc0) {
776       gst_element_set_state (udpsrc0, GST_STATE_NULL);
777       gst_object_unref (udpsrc0);
778     }
779     if (udpsrc1) {
780       gst_element_set_state (udpsrc1, GST_STATE_NULL);
781       gst_object_unref (udpsrc1);
782     }
783     if (udpsink0) {
784       gst_element_set_state (udpsink0, GST_STATE_NULL);
785       gst_object_unref (udpsink0);
786     }
787     if (udpsink1) {
788       gst_element_set_state (udpsink1, GST_STATE_NULL);
789       gst_object_unref (udpsink1);
790     }
791     if (inetaddr)
792       g_object_unref (inetaddr);
793     g_list_free_full (rejected_addresses,
794         (GDestroyNotify) gst_rtsp_address_free);
795     if (addr)
796       gst_rtsp_address_free (addr);
797     if (rtp_socket)
798       g_object_unref (rtp_socket);
799     if (rtcp_socket)
800       g_object_unref (rtcp_socket);
801     return FALSE;
802   }
803 }
804
805 /* must be called with lock */
806 static gboolean
807 alloc_ports (GstRTSPStream * stream)
808 {
809   GstRTSPStreamPrivate *priv = stream->priv;
810
811   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
812       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
813       &priv->server_port_v4, &priv->server_addr_v4);
814
815   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
816       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
817       &priv->server_port_v6, &priv->server_addr_v6);
818
819   return priv->have_ipv4 || priv->have_ipv6;
820 }
821
822 /**
823  * gst_rtsp_stream_get_server_port:
824  * @stream: a #GstRTSPStream
825  * @server_port: (out): result server port
826  *
827  * Fill @server_port with the port pair used by the server. This function can
828  * only be called when @stream has been joined.
829  */
830 void
831 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
832     GstRTSPRange * server_port, GSocketFamily family)
833 {
834   GstRTSPStreamPrivate *priv;
835
836   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
837   priv = stream->priv;
838   g_return_if_fail (priv->is_joined);
839
840   g_mutex_lock (&priv->lock);
841   if (family == G_SOCKET_FAMILY_IPV4) {
842     if (server_port)
843       *server_port = priv->server_port_v4;
844   } else {
845     if (server_port)
846       *server_port = priv->server_port_v6;
847   }
848   g_mutex_unlock (&priv->lock);
849 }
850
851 /**
852  * gst_rtsp_stream_get_rtpsession:
853  * @stream: a #GstRTSPStream
854  *
855  * Get the RTP session of this stream.
856  *
857  * Returns: The RTP session of this stream. Unref after usage.
858  */
859 GObject *
860 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
861 {
862   GstRTSPStreamPrivate *priv;
863   GObject *session;
864
865   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
866
867   priv = stream->priv;
868
869   g_mutex_lock (&priv->lock);
870   if ((session = priv->session))
871     g_object_ref (session);
872   g_mutex_unlock (&priv->lock);
873
874   return session;
875 }
876
877 /**
878  * gst_rtsp_stream_get_ssrc:
879  * @stream: a #GstRTSPStream
880  * @ssrc: (out): result ssrc
881  *
882  * Get the SSRC used by the RTP session of this stream. This function can only
883  * be called when @stream has been joined.
884  */
885 void
886 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
887 {
888   GstRTSPStreamPrivate *priv;
889
890   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
891   priv = stream->priv;
892   g_return_if_fail (priv->is_joined);
893
894   g_mutex_lock (&priv->lock);
895   if (ssrc && priv->session)
896     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
897   g_mutex_unlock (&priv->lock);
898 }
899
900 /* executed from streaming thread */
901 static void
902 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
903 {
904   GstRTSPStreamPrivate *priv = stream->priv;
905   GstCaps *newcaps, *oldcaps;
906
907   newcaps = gst_pad_get_current_caps (pad);
908
909   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
910       newcaps);
911
912   g_mutex_lock (&priv->lock);
913   oldcaps = priv->caps;
914   priv->caps = newcaps;
915   g_mutex_unlock (&priv->lock);
916
917   if (oldcaps)
918     gst_caps_unref (oldcaps);
919 }
920
921 static void
922 dump_structure (const GstStructure * s)
923 {
924   gchar *sstr;
925
926   sstr = gst_structure_to_string (s);
927   GST_INFO ("structure: %s", sstr);
928   g_free (sstr);
929 }
930
931 static GstRTSPStreamTransport *
932 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
933 {
934   GstRTSPStreamPrivate *priv = stream->priv;
935   GList *walk;
936   GstRTSPStreamTransport *result = NULL;
937   const gchar *tmp;
938   gchar *dest;
939   guint port;
940
941   if (rtcp_from == NULL)
942     return NULL;
943
944   tmp = g_strrstr (rtcp_from, ":");
945   if (tmp == NULL)
946     return NULL;
947
948   port = atoi (tmp + 1);
949   dest = g_strndup (rtcp_from, tmp - rtcp_from);
950
951   g_mutex_lock (&priv->lock);
952   GST_INFO ("finding %s:%d in %d transports", dest, port,
953       g_list_length (priv->transports));
954
955   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
956     GstRTSPStreamTransport *trans = walk->data;
957     const GstRTSPTransport *tr;
958     gint min, max;
959
960     tr = gst_rtsp_stream_transport_get_transport (trans);
961
962     min = tr->client_port.min;
963     max = tr->client_port.max;
964
965     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
966       result = trans;
967       break;
968     }
969   }
970   if (result)
971     g_object_ref (result);
972   g_mutex_unlock (&priv->lock);
973
974   g_free (dest);
975
976   return result;
977 }
978
979 static GstRTSPStreamTransport *
980 check_transport (GObject * source, GstRTSPStream * stream)
981 {
982   GstStructure *stats;
983   GstRTSPStreamTransport *trans;
984
985   /* see if we have a stream to match with the origin of the RTCP packet */
986   trans = g_object_get_qdata (source, ssrc_stream_map_key);
987   if (trans == NULL) {
988     g_object_get (source, "stats", &stats, NULL);
989     if (stats) {
990       const gchar *rtcp_from;
991
992       dump_structure (stats);
993
994       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
995       if ((trans = find_transport (stream, rtcp_from))) {
996         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
997             source);
998         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
999             g_object_unref);
1000       }
1001       gst_structure_free (stats);
1002     }
1003   }
1004   return trans;
1005 }
1006
1007
1008 static void
1009 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1010 {
1011   GstRTSPStreamTransport *trans;
1012
1013   GST_INFO ("%p: new source %p", stream, source);
1014
1015   trans = check_transport (source, stream);
1016
1017   if (trans)
1018     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1019 }
1020
1021 static void
1022 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1023 {
1024   GST_INFO ("%p: new SDES %p", stream, source);
1025 }
1026
1027 static void
1028 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1029 {
1030   GstRTSPStreamTransport *trans;
1031
1032   trans = check_transport (source, stream);
1033
1034   if (trans) {
1035     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1036     gst_rtsp_stream_transport_keep_alive (trans);
1037   }
1038 #ifdef DUMP_STATS
1039   {
1040     GstStructure *stats;
1041     g_object_get (source, "stats", &stats, NULL);
1042     if (stats) {
1043       dump_structure (stats);
1044       gst_structure_free (stats);
1045     }
1046   }
1047 #endif
1048 }
1049
1050 static void
1051 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1052 {
1053   GST_INFO ("%p: source %p bye", stream, source);
1054 }
1055
1056 static void
1057 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1058 {
1059   GstRTSPStreamTransport *trans;
1060
1061   GST_INFO ("%p: source %p bye timeout", stream, source);
1062
1063   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1064     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1065     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1066   }
1067 }
1068
1069 static void
1070 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1071 {
1072   GstRTSPStreamTransport *trans;
1073
1074   GST_INFO ("%p: source %p timeout", stream, source);
1075
1076   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1077     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1078     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1079   }
1080 }
1081
1082 static GstFlowReturn
1083 handle_new_sample (GstAppSink * sink, gpointer user_data)
1084 {
1085   GstRTSPStreamPrivate *priv;
1086   GList *walk;
1087   GstSample *sample;
1088   GstBuffer *buffer;
1089   GstRTSPStream *stream;
1090
1091   sample = gst_app_sink_pull_sample (sink);
1092   if (!sample)
1093     return GST_FLOW_OK;
1094
1095   stream = (GstRTSPStream *) user_data;
1096   priv = stream->priv;
1097   buffer = gst_sample_get_buffer (sample);
1098
1099   g_mutex_lock (&priv->lock);
1100   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1101     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1102
1103     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1104       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1105     } else {
1106       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1107     }
1108   }
1109   g_mutex_unlock (&priv->lock);
1110
1111   gst_sample_unref (sample);
1112
1113   return GST_FLOW_OK;
1114 }
1115
1116 static GstAppSinkCallbacks sink_cb = {
1117   NULL,                         /* not interested in EOS */
1118   NULL,                         /* not interested in preroll samples */
1119   handle_new_sample,
1120 };
1121
1122 /**
1123  * gst_rtsp_stream_join_bin:
1124  * @stream: a #GstRTSPStream
1125  * @bin: a #GstBin to join
1126  * @rtpbin: a rtpbin element in @bin
1127  * @state: the target state of the new elements
1128  *
1129  * Join the #Gstbin @bin that contains the element @rtpbin.
1130  *
1131  * @stream will link to @rtpbin, which must be inside @bin. The elements
1132  * added to @bin will be set to the state given in @state.
1133  *
1134  * Returns: %TRUE on success.
1135  */
1136 gboolean
1137 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1138     GstElement * rtpbin, GstState state)
1139 {
1140   GstRTSPStreamPrivate *priv;
1141   gint i, idx;
1142   gchar *name;
1143   GstPad *pad, *teepad, *queuepad, *selpad;
1144   GstPadLinkReturn ret;
1145
1146   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1147   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1148   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1149
1150   priv = stream->priv;
1151
1152   g_mutex_lock (&priv->lock);
1153   if (priv->is_joined)
1154     goto was_joined;
1155
1156   /* create a session with the same index as the stream */
1157   idx = priv->idx;
1158
1159   GST_INFO ("stream %p joining bin as session %d", stream, idx);
1160
1161   if (!alloc_ports (stream))
1162     goto no_ports;
1163
1164   /* update the dscp qos field in the sinks */
1165   update_dscp_qos (stream);
1166
1167   /* get a pad for sending RTP */
1168   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1169   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1170   g_free (name);
1171   /* link the RTP pad to the session manager, it should not really fail unless
1172    * this is not really an RTP pad */
1173   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1174   if (ret != GST_PAD_LINK_OK)
1175     goto link_failed;
1176
1177   /* get pads from the RTP session element for sending and receiving
1178    * RTP/RTCP*/
1179   name = g_strdup_printf ("send_rtp_src_%u", idx);
1180   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1181   g_free (name);
1182   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1183   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1184   g_free (name);
1185   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1186   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1187   g_free (name);
1188   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1189   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1190   g_free (name);
1191
1192   /* get the session */
1193   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1194
1195   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1196       stream);
1197   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1198       stream);
1199   g_signal_connect (priv->session, "on-ssrc-active",
1200       (GCallback) on_ssrc_active, stream);
1201   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1202       stream);
1203   g_signal_connect (priv->session, "on-bye-timeout",
1204       (GCallback) on_bye_timeout, stream);
1205   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1206       stream);
1207
1208   for (i = 0; i < 2; i++) {
1209     /* For the sender we create this bit of pipeline for both
1210      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1211      * we need to add a queue before appsink to make the pipeline
1212      * not block. For the TCP case, we want to pump data to the
1213      * client as fast as possible anyway.
1214      *
1215      * .--------.      .-----.    .---------.
1216      * | rtpbin |      | tee |    | udpsink |
1217      * |       send->sink   src->sink       |
1218      * '--------'      |     |    '---------'
1219      *                 |     |    .---------.    .---------.
1220      *                 |     |    |  queue  |    | appsink |
1221      *                 |    src->sink      src->sink       |
1222      *                 '-----'    '---------'    '---------'
1223      */
1224     /* make tee for RTP/RTCP */
1225     priv->tee[i] = gst_element_factory_make ("tee", NULL);
1226     gst_bin_add (bin, priv->tee[i]);
1227
1228     /* and link to rtpbin send pad */
1229     pad = gst_element_get_static_pad (priv->tee[i], "sink");
1230     gst_pad_link (priv->send_src[i], pad);
1231     gst_object_unref (pad);
1232
1233     /* add udpsink */
1234     gst_bin_add (bin, priv->udpsink[i]);
1235
1236     /* link tee to udpsink */
1237     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1238     pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1239     gst_pad_link (teepad, pad);
1240     gst_object_unref (pad);
1241     gst_object_unref (teepad);
1242
1243     /* make queue */
1244     priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1245     gst_bin_add (bin, priv->appqueue[i]);
1246     /* and link to tee */
1247     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1248     pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1249     gst_pad_link (teepad, pad);
1250     gst_object_unref (pad);
1251     gst_object_unref (teepad);
1252
1253     /* make appsink */
1254     priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1255     g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1256     g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1257     gst_bin_add (bin, priv->appsink[i]);
1258     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1259         &sink_cb, stream, NULL);
1260     /* and link to queue */
1261     queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1262     pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1263     gst_pad_link (queuepad, pad);
1264     gst_object_unref (pad);
1265     gst_object_unref (queuepad);
1266
1267     /* For the receiver we create this bit of pipeline for both
1268      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1269      * and it is all funneled into the rtpbin receive pad.
1270      *
1271      * .--------.     .--------.    .--------.
1272      * | udpsrc |     | funnel |    | rtpbin |
1273      * |       src->sink      src->sink      |
1274      * '--------'     |        |    '--------'
1275      * .--------.     |        |
1276      * | appsrc |     |        |
1277      * |       src->sink       |
1278      * '--------'     '--------'
1279      */
1280     /* make funnel for the RTP/RTCP receivers */
1281     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1282     gst_bin_add (bin, priv->funnel[i]);
1283
1284     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1285     gst_pad_link (pad, priv->recv_sink[i]);
1286     gst_object_unref (pad);
1287
1288     if (priv->udpsrc_v4[i]) {
1289       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1290        * values */
1291       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1292       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1293       /* add udpsrc */
1294       gst_bin_add (bin, priv->udpsrc_v4[i]);
1295
1296       /* and link to the funnel v4 */
1297       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1298       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1299       gst_pad_link (pad, selpad);
1300       gst_object_unref (pad);
1301       gst_object_unref (selpad);
1302     }
1303
1304     if (priv->udpsrc_v6[i]) {
1305       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1306       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1307       gst_bin_add (bin, priv->udpsrc_v6[i]);
1308
1309       /* and link to the funnel v6 */
1310       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1311       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1312       gst_pad_link (pad, selpad);
1313       gst_object_unref (pad);
1314       gst_object_unref (selpad);
1315     }
1316
1317     /* make and add appsrc */
1318     priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1319     gst_bin_add (bin, priv->appsrc[i]);
1320     /* and link to the funnel */
1321     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1322     pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1323     gst_pad_link (pad, selpad);
1324     gst_object_unref (pad);
1325     gst_object_unref (selpad);
1326
1327     /* check if we need to set to a special state */
1328     if (state != GST_STATE_NULL) {
1329       gst_element_set_state (priv->udpsink[i], state);
1330       gst_element_set_state (priv->appsink[i], state);
1331       gst_element_set_state (priv->appqueue[i], state);
1332       gst_element_set_state (priv->tee[i], state);
1333       gst_element_set_state (priv->funnel[i], state);
1334       gst_element_set_state (priv->appsrc[i], state);
1335     }
1336   }
1337
1338   /* be notified of caps changes */
1339   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1340       (GCallback) caps_notify, stream);
1341
1342   priv->is_joined = TRUE;
1343   g_mutex_unlock (&priv->lock);
1344
1345   return TRUE;
1346
1347   /* ERRORS */
1348 was_joined:
1349   {
1350     g_mutex_unlock (&priv->lock);
1351     return TRUE;
1352   }
1353 no_ports:
1354   {
1355     g_mutex_unlock (&priv->lock);
1356     GST_WARNING ("failed to allocate ports %d", idx);
1357     return FALSE;
1358   }
1359 link_failed:
1360   {
1361     GST_WARNING ("failed to link stream %d", idx);
1362     gst_object_unref (priv->send_rtp_sink);
1363     priv->send_rtp_sink = NULL;
1364     g_mutex_unlock (&priv->lock);
1365     return FALSE;
1366   }
1367 }
1368
1369 /**
1370  * gst_rtsp_stream_leave_bin:
1371  * @stream: a #GstRTSPStream
1372  * @bin: a #GstBin
1373  * @rtpbin: a rtpbin #GstElement
1374  *
1375  * Remove the elements of @stream from @bin.
1376  *
1377  * Return: %TRUE on success.
1378  */
1379 gboolean
1380 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1381     GstElement * rtpbin)
1382 {
1383   GstRTSPStreamPrivate *priv;
1384   gint i;
1385
1386   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1387   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1388   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1389
1390   priv = stream->priv;
1391
1392   g_mutex_lock (&priv->lock);
1393   if (!priv->is_joined)
1394     goto was_not_joined;
1395
1396   /* all transports must be removed by now */
1397   g_return_val_if_fail (priv->transports == NULL, FALSE);
1398
1399   GST_INFO ("stream %p leaving bin", stream);
1400
1401   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1402   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1403   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1404   gst_object_unref (priv->send_rtp_sink);
1405   priv->send_rtp_sink = NULL;
1406
1407   for (i = 0; i < 2; i++) {
1408     gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1409     gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1410     gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1411     gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1412     gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1413     gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1414     if (priv->udpsrc_v4[i]) {
1415       /* and set udpsrc to NULL now before removing */
1416       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1417       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1418       /* removing them should also nicely release the request
1419        * pads when they finalize */
1420       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1421     }
1422     if (priv->udpsrc_v6[i]) {
1423       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1424       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1425       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1426     }
1427     gst_bin_remove (bin, priv->udpsink[i]);
1428     gst_bin_remove (bin, priv->appsrc[i]);
1429     gst_bin_remove (bin, priv->appsink[i]);
1430     gst_bin_remove (bin, priv->appqueue[i]);
1431     gst_bin_remove (bin, priv->tee[i]);
1432     gst_bin_remove (bin, priv->funnel[i]);
1433
1434     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1435     gst_object_unref (priv->recv_sink[i]);
1436     priv->recv_sink[i] = NULL;
1437
1438     priv->udpsrc_v4[i] = NULL;
1439     priv->udpsrc_v6[i] = NULL;
1440     priv->udpsink[i] = NULL;
1441     priv->appsrc[i] = NULL;
1442     priv->appsink[i] = NULL;
1443     priv->appqueue[i] = NULL;
1444     priv->tee[i] = NULL;
1445     priv->funnel[i] = NULL;
1446   }
1447   gst_object_unref (priv->send_src[0]);
1448   priv->send_src[0] = NULL;
1449
1450   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1451   gst_object_unref (priv->send_src[1]);
1452   priv->send_src[1] = NULL;
1453
1454   g_object_unref (priv->session);
1455   priv->session = NULL;
1456   if (priv->caps)
1457     gst_caps_unref (priv->caps);
1458   priv->caps = NULL;
1459
1460   priv->is_joined = FALSE;
1461   g_mutex_unlock (&priv->lock);
1462
1463   return TRUE;
1464
1465 was_not_joined:
1466   {
1467     return TRUE;
1468   }
1469 }
1470
1471 /**
1472  * gst_rtsp_stream_get_rtpinfo:
1473  * @stream: a #GstRTSPStream
1474  * @rtptime: result RTP timestamp
1475  * @seq: result RTP seqnum
1476  *
1477  * Retrieve the current rtptime and seq. This is used to
1478  * construct a RTPInfo reply header.
1479  *
1480  * Returns: %TRUE when rtptime and seq could be determined.
1481  */
1482 gboolean
1483 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1484     guint * rtptime, guint * seq)
1485 {
1486   GstRTSPStreamPrivate *priv;
1487   GObjectClass *payobjclass;
1488
1489   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1490   g_return_val_if_fail (rtptime != NULL, FALSE);
1491   g_return_val_if_fail (seq != NULL, FALSE);
1492
1493   priv = stream->priv;
1494
1495   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1496
1497   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1498       !g_object_class_find_property (payobjclass, "timestamp"))
1499     return FALSE;
1500
1501   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1502
1503   return TRUE;
1504 }
1505
1506 /**
1507  * gst_rtsp_stream_get_caps:
1508  * @stream: a #GstRTSPStream
1509  *
1510  * Retrieve the current caps of @stream.
1511  *
1512  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1513  *    after usage.
1514  */
1515 GstCaps *
1516 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1517 {
1518   GstRTSPStreamPrivate *priv;
1519   GstCaps *result;
1520
1521   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1522
1523   priv = stream->priv;
1524
1525   g_mutex_lock (&priv->lock);
1526   if ((result = priv->caps))
1527     gst_caps_ref (result);
1528   g_mutex_unlock (&priv->lock);
1529
1530   return result;
1531 }
1532
1533 /**
1534  * gst_rtsp_stream_recv_rtp:
1535  * @stream: a #GstRTSPStream
1536  * @buffer: (transfer full): a #GstBuffer
1537  *
1538  * Handle an RTP buffer for the stream. This method is usually called when a
1539  * message has been received from a client using the TCP transport.
1540  *
1541  * This function takes ownership of @buffer.
1542  *
1543  * Returns: a GstFlowReturn.
1544  */
1545 GstFlowReturn
1546 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1547 {
1548   GstRTSPStreamPrivate *priv;
1549   GstFlowReturn ret;
1550   GstElement *element;
1551
1552   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1553   priv = stream->priv;
1554   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1555   g_return_val_if_fail (priv->is_joined, FALSE);
1556
1557   g_mutex_lock (&priv->lock);
1558   element = gst_object_ref (priv->appsrc[0]);
1559   g_mutex_unlock (&priv->lock);
1560
1561   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1562
1563   gst_object_unref (element);
1564
1565   return ret;
1566 }
1567
1568 /**
1569  * gst_rtsp_stream_recv_rtcp:
1570  * @stream: a #GstRTSPStream
1571  * @buffer: (transfer full): a #GstBuffer
1572  *
1573  * Handle an RTCP buffer for the stream. This method is usually called when a
1574  * message has been received from a client using the TCP transport.
1575  *
1576  * This function takes ownership of @buffer.
1577  *
1578  * Returns: a GstFlowReturn.
1579  */
1580 GstFlowReturn
1581 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1582 {
1583   GstRTSPStreamPrivate *priv;
1584   GstFlowReturn ret;
1585   GstElement *element;
1586
1587   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1588   priv = stream->priv;
1589   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1590   g_return_val_if_fail (priv->is_joined, FALSE);
1591
1592   g_mutex_lock (&priv->lock);
1593   element = gst_object_ref (priv->appsrc[1]);
1594   g_mutex_unlock (&priv->lock);
1595
1596   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1597
1598   gst_object_unref (element);
1599
1600   return ret;
1601 }
1602
1603 /* must be called with lock */
1604 static gboolean
1605 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1606     gboolean add)
1607 {
1608   GstRTSPStreamPrivate *priv = stream->priv;
1609   const GstRTSPTransport *tr;
1610
1611   tr = gst_rtsp_stream_transport_get_transport (trans);
1612
1613   switch (tr->lower_transport) {
1614     case GST_RTSP_LOWER_TRANS_UDP:
1615     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1616     {
1617       gchar *dest;
1618       gint min, max;
1619       guint ttl = 0;
1620
1621       dest = tr->destination;
1622       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1623         min = tr->port.min;
1624         max = tr->port.max;
1625         ttl = tr->ttl;
1626       } else {
1627         min = tr->client_port.min;
1628         max = tr->client_port.max;
1629       }
1630
1631       if (add) {
1632         GST_INFO ("adding %s:%d-%d", dest, min, max);
1633         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1634         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1635         if (ttl > 0) {
1636           GST_INFO ("setting ttl-mc %d", ttl);
1637           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1638           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1639         }
1640         priv->transports = g_list_prepend (priv->transports, trans);
1641       } else {
1642         GST_INFO ("removing %s:%d-%d", dest, min, max);
1643         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1644         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1645         priv->transports = g_list_remove (priv->transports, trans);
1646       }
1647       break;
1648     }
1649     case GST_RTSP_LOWER_TRANS_TCP:
1650       if (add) {
1651         GST_INFO ("adding TCP %s", tr->destination);
1652         priv->transports = g_list_prepend (priv->transports, trans);
1653       } else {
1654         GST_INFO ("removing TCP %s", tr->destination);
1655         priv->transports = g_list_remove (priv->transports, trans);
1656       }
1657       break;
1658     default:
1659       goto unknown_transport;
1660   }
1661   return TRUE;
1662
1663   /* ERRORS */
1664 unknown_transport:
1665   {
1666     GST_INFO ("Unknown transport %d", tr->lower_transport);
1667     return FALSE;
1668   }
1669 }
1670
1671
1672 /**
1673  * gst_rtsp_stream_add_transport:
1674  * @stream: a #GstRTSPStream
1675  * @trans: a #GstRTSPStreamTransport
1676  *
1677  * Add the transport in @trans to @stream. The media of @stream will
1678  * then also be send to the values configured in @trans.
1679  *
1680  * @stream must be joined to a bin.
1681  *
1682  * @trans must contain a valid #GstRTSPTransport.
1683  *
1684  * Returns: %TRUE if @trans was added
1685  */
1686 gboolean
1687 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1688     GstRTSPStreamTransport * trans)
1689 {
1690   GstRTSPStreamPrivate *priv;
1691   gboolean res;
1692
1693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1694   priv = stream->priv;
1695   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1696   g_return_val_if_fail (priv->is_joined, FALSE);
1697
1698   g_mutex_lock (&priv->lock);
1699   res = update_transport (stream, trans, TRUE);
1700   g_mutex_unlock (&priv->lock);
1701
1702   return res;
1703 }
1704
1705 /**
1706  * gst_rtsp_stream_remove_transport:
1707  * @stream: a #GstRTSPStream
1708  * @trans: a #GstRTSPStreamTransport
1709  *
1710  * Remove the transport in @trans from @stream. The media of @stream will
1711  * not be sent to the values configured in @trans.
1712  *
1713  * @stream must be joined to a bin.
1714  *
1715  * @trans must contain a valid #GstRTSPTransport.
1716  *
1717  * Returns: %TRUE if @trans was removed
1718  */
1719 gboolean
1720 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1721     GstRTSPStreamTransport * trans)
1722 {
1723   GstRTSPStreamPrivate *priv;
1724   gboolean res;
1725
1726   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1727   priv = stream->priv;
1728   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1729   g_return_val_if_fail (priv->is_joined, FALSE);
1730
1731   g_mutex_lock (&priv->lock);
1732   res = update_transport (stream, trans, FALSE);
1733   g_mutex_unlock (&priv->lock);
1734
1735   return res;
1736 }