stream: allow access to the rtp session
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
31      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
32
33 struct _GstRTSPStreamPrivate
34 {
35   GMutex lock;
36   guint idx;
37   GstPad *srcpad;
38   GstElement *payloader;
39   guint buffer_size;
40   gboolean is_joined;
41
42   /* pads on the rtpbin */
43   GstPad *send_rtp_sink;
44   GstPad *recv_sink[2];
45   GstPad *send_src[2];
46
47   /* the RTPSession object */
48   GObject *session;
49
50   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
51    * sockets */
52   GstElement *udpsrc_v4[2];
53
54   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
55    * sockets */
56   GstElement *udpsrc_v6[2];
57
58   GstElement *udpsink[2];
59
60   /* for TCP transport */
61   GstElement *appsrc[2];
62   GstElement *appqueue[2];
63   GstElement *appsink[2];
64
65   GstElement *tee[2];
66   GstElement *funnel[2];
67
68   /* server ports for sending/receiving over ipv4 */
69   GstRTSPRange server_port_v4;
70   GstRTSPAddress *server_addr_v4;
71
72   /* server ports for sending/receiving over ipv6 */
73   GstRTSPRange server_port_v6;
74   GstRTSPAddress *server_addr_v6;
75
76   /* multicast addresses */
77   GstRTSPAddressPool *pool;
78   GstRTSPAddress *addr;
79
80   /* the caps of the stream */
81   gulong caps_sig;
82   GstCaps *caps;
83
84   /* transports we stream to */
85   guint n_active;
86   GList *transports;
87
88   gint dscp_qos;
89 };
90
91
92 enum
93 {
94   PROP_0,
95   PROP_LAST
96 };
97
98 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
99 #define GST_CAT_DEFAULT rtsp_stream_debug
100
101 static GQuark ssrc_stream_map_key;
102
103 static void gst_rtsp_stream_finalize (GObject * obj);
104
105 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
106
107 static void
108 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
109 {
110   GObjectClass *gobject_class;
111
112   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
113
114   gobject_class = G_OBJECT_CLASS (klass);
115
116   gobject_class->finalize = gst_rtsp_stream_finalize;
117
118   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
119
120   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
121 }
122
123 static void
124 gst_rtsp_stream_init (GstRTSPStream * stream)
125 {
126   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
127
128   GST_DEBUG ("new stream %p", stream);
129
130   stream->priv = priv;
131
132   stream->priv->dscp_qos = -1;
133
134   g_mutex_init (&priv->lock);
135 }
136
137 static void
138 gst_rtsp_stream_finalize (GObject * obj)
139 {
140   GstRTSPStream *stream;
141   GstRTSPStreamPrivate *priv;
142
143   stream = GST_RTSP_STREAM (obj);
144   priv = stream->priv;
145
146   GST_DEBUG ("finalize stream %p", stream);
147
148   /* we really need to be unjoined now */
149   g_return_if_fail (!priv->is_joined);
150
151   if (priv->addr)
152     gst_rtsp_address_free (priv->addr);
153   if (priv->server_addr_v4)
154     gst_rtsp_address_free (priv->server_addr_v4);
155   if (priv->server_addr_v6)
156     gst_rtsp_address_free (priv->server_addr_v6);
157   if (priv->pool)
158     g_object_unref (priv->pool);
159   gst_object_unref (priv->payloader);
160   gst_object_unref (priv->srcpad);
161   g_mutex_clear (&priv->lock);
162
163   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
164 }
165
166 /**
167  * gst_rtsp_stream_new:
168  * @idx: an index
169  * @srcpad: a #GstPad
170  * @payloader: a #GstElement
171  *
172  * Create a new media stream with index @idx that handles RTP data on
173  * @srcpad and has a payloader element @payloader.
174  *
175  * Returns: a new #GstRTSPStream
176  */
177 GstRTSPStream *
178 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
179 {
180   GstRTSPStreamPrivate *priv;
181   GstRTSPStream *stream;
182
183   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
184   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
185   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
186
187   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
188   priv = stream->priv;
189   priv->idx = idx;
190   priv->payloader = gst_object_ref (payloader);
191   priv->srcpad = gst_object_ref (srcpad);
192
193   return stream;
194 }
195
196 /**
197  * gst_rtsp_stream_get_index:
198  * @stream: a #GstRTSPStream
199  *
200  * Get the stream index.
201  *
202  * Return: the stream index.
203  */
204 guint
205 gst_rtsp_stream_get_index (GstRTSPStream * stream)
206 {
207   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
208
209   return stream->priv->idx;
210 }
211
212  /**
213  * gst_rtsp_stream_get_srcpad:
214  * @stream: a #GstRTSPStream
215  *
216  * Get the srcpad associated with @stream.
217  *
218  * Return: the srcpad. Unref after usage.
219  */
220 GstPad *
221 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
222 {
223   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
224
225   return gst_object_ref (stream->priv->srcpad);
226 }
227
228 /**
229  * gst_rtsp_stream_set_mtu:
230  * @stream: a #GstRTSPStream
231  * @mtu: a new MTU
232  *
233  * Configure the mtu in the payloader of @stream to @mtu.
234  */
235 void
236 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
237 {
238   GstRTSPStreamPrivate *priv;
239
240   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
241
242   priv = stream->priv;
243
244   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
245
246   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
247 }
248
249 /**
250  * gst_rtsp_stream_get_mtu:
251  * @stream: a #GstRTSPStream
252  *
253  * Get the configured MTU in the payloader of @stream.
254  *
255  * Returns: the MTU of the payloader.
256  */
257 guint
258 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
259 {
260   GstRTSPStreamPrivate *priv;
261   guint mtu;
262
263   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
264
265   priv = stream->priv;
266
267   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
268
269   return mtu;
270 }
271
272 /* Update the dscp qos property on the udp sinks */
273 static void
274 update_dscp_qos (GstRTSPStream * stream)
275 {
276   GstRTSPStreamPrivate *priv;
277
278   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
279
280   priv = stream->priv;
281
282   if (priv->udpsink[0]) {
283     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
284         NULL);
285   }
286
287   if (priv->udpsink[1]) {
288     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
289         NULL);
290   }
291 }
292
293 /**
294  * gst_rtsp_stream_set_dscp_qos:
295  * @stream: a #GstRTSPStream
296  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
297  *
298  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
299  */
300 void
301 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
302 {
303   GstRTSPStreamPrivate *priv;
304
305   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
306
307   priv = stream->priv;
308
309   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
310
311   if (dscp_qos < -1 || dscp_qos > 63) {
312     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
313     return;
314   }
315
316   priv->dscp_qos = dscp_qos;
317
318   update_dscp_qos (stream);
319 }
320
321 /**
322  * gst_rtsp_stream_get_dscp_qos:
323  * @stream: a #GstRTSPStream
324  *
325  * Get the configured DSCP QoS in of the outgoing sockets.
326  *
327  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
328  */
329 gint
330 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
331 {
332   GstRTSPStreamPrivate *priv;
333
334   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
335
336   priv = stream->priv;
337
338   return priv->dscp_qos;
339 }
340
341
342 /**
343  * gst_rtsp_stream_set_address_pool:
344  * @stream: a #GstRTSPStream
345  * @pool: a #GstRTSPAddressPool
346  *
347  * configure @pool to be used as the address pool of @stream.
348  */
349 void
350 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
351     GstRTSPAddressPool * pool)
352 {
353   GstRTSPStreamPrivate *priv;
354   GstRTSPAddressPool *old;
355
356   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
357
358   priv = stream->priv;
359
360   GST_LOG_OBJECT (stream, "set address pool %p", pool);
361
362   g_mutex_lock (&priv->lock);
363   if ((old = priv->pool) != pool)
364     priv->pool = pool ? g_object_ref (pool) : NULL;
365   else
366     old = NULL;
367   g_mutex_unlock (&priv->lock);
368
369   if (old)
370     g_object_unref (old);
371 }
372
373 /**
374  * gst_rtsp_stream_get_address_pool:
375  * @stream: a #GstRTSPStream
376  *
377  * Get the #GstRTSPAddressPool used as the address pool of @stream.
378  *
379  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
380  * usage.
381  */
382 GstRTSPAddressPool *
383 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
384 {
385   GstRTSPStreamPrivate *priv;
386   GstRTSPAddressPool *result;
387
388   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
389
390   priv = stream->priv;
391
392   g_mutex_lock (&priv->lock);
393   if ((result = priv->pool))
394     g_object_ref (result);
395   g_mutex_unlock (&priv->lock);
396
397   return result;
398 }
399
400 /**
401  * gst_rtsp_stream_get_address:
402  * @stream: a #GstRTSPStream
403  *
404  * Get the multicast address of @stream.
405  *
406  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
407  * allocated. gst_rtsp_address_free() after usage.
408  */
409 GstRTSPAddress *
410 gst_rtsp_stream_get_address (GstRTSPStream * stream)
411 {
412   GstRTSPStreamPrivate *priv;
413   GstRTSPAddress *result;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
416
417   priv = stream->priv;
418
419   g_mutex_lock (&priv->lock);
420   if (priv->addr == NULL) {
421     if (priv->pool == NULL)
422       goto no_pool;
423
424     priv->addr = gst_rtsp_address_pool_acquire_address (priv->pool,
425         GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST, 2);
426     if (priv->addr == NULL)
427       goto no_address;
428   }
429   result = gst_rtsp_address_copy (priv->addr);
430   g_mutex_unlock (&priv->lock);
431
432   return result;
433
434   /* ERRORS */
435 no_pool:
436   {
437     GST_ERROR_OBJECT (stream, "no address pool specified");
438     g_mutex_unlock (&priv->lock);
439     return NULL;
440   }
441 no_address:
442   {
443     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
444     g_mutex_unlock (&priv->lock);
445     return NULL;
446   }
447 }
448
449 /**
450  * gst_rtsp_stream_reserve_address:
451  * @stream: a #GstRTSPStream
452  *
453  * Get a specific multicast address of @stream.
454  *
455  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
456  * allocated. gst_rtsp_address_free() after usage.
457  */
458 GstRTSPAddress *
459 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
460     const gchar * address, guint port, guint n_ports, guint ttl)
461 {
462   GstRTSPStreamPrivate *priv;
463   GstRTSPAddress *result;
464
465   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
466   g_return_val_if_fail (address != NULL, NULL);
467   g_return_val_if_fail (port > 0, NULL);
468   g_return_val_if_fail (n_ports > 0, NULL);
469   g_return_val_if_fail (ttl > 0, NULL);
470
471   priv = stream->priv;
472
473   g_mutex_lock (&priv->lock);
474   if (priv->addr == NULL) {
475     if (priv->pool == NULL)
476       goto no_pool;
477
478     priv->addr = gst_rtsp_address_pool_reserve_address (priv->pool, address,
479         port, n_ports, ttl);
480     if (priv->addr == NULL)
481       goto no_address;
482   } else {
483     if (strcmp (priv->addr->address, address) ||
484         priv->addr->port != port || priv->addr->n_ports != n_ports ||
485         priv->addr->ttl != ttl)
486       goto different_address;
487   }
488   result = gst_rtsp_address_copy (priv->addr);
489   g_mutex_unlock (&priv->lock);
490
491   return result;
492
493   /* ERRORS */
494 no_pool:
495   {
496     GST_ERROR_OBJECT (stream, "no address pool specified");
497     g_mutex_unlock (&priv->lock);
498     return NULL;
499   }
500 no_address:
501   {
502     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
503         address);
504     g_mutex_unlock (&priv->lock);
505     return NULL;
506   }
507 different_address:
508   {
509     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
510         " reserved", address);
511     g_mutex_unlock (&priv->lock);
512     return NULL;
513   }
514 }
515
516 static gboolean
517 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
518     GSocketFamily family, GstElement * udpsrc_out[2],
519     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
520     GstRTSPAddress ** server_addr_out)
521 {
522   GstStateChangeReturn ret;
523   GstElement *udpsrc0, *udpsrc1;
524   GstElement *udpsink0, *udpsink1;
525   GSocket *rtp_socket = NULL;
526   GSocket *rtcp_socket;
527   gint tmp_rtp, tmp_rtcp;
528   guint count;
529   gint rtpport, rtcpport;
530   GList *rejected_addresses = NULL;
531   GstRTSPAddress *addr = NULL;
532   GInetAddress *inetaddr = NULL;
533   GSocketAddress *rtp_sockaddr = NULL;
534   GSocketAddress *rtcp_sockaddr = NULL;
535   const gchar *multisink_socket = "socket";
536
537   if (family == G_SOCKET_FAMILY_IPV6) {
538     multisink_socket = "socket-v6";
539   }
540
541   udpsrc0 = NULL;
542   udpsrc1 = NULL;
543   udpsink0 = NULL;
544   udpsink1 = NULL;
545   count = 0;
546
547   /* Start with random port */
548   tmp_rtp = 0;
549
550   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
551       G_SOCKET_PROTOCOL_UDP, NULL);
552   if (!rtcp_socket)
553     goto no_udp_protocol;
554
555   if (*server_addr_out)
556     gst_rtsp_address_free (*server_addr_out);
557
558   /* try to allocate 2 UDP ports, the RTP port should be an even
559    * number and the RTCP port should be the next (uneven) port */
560 again:
561
562   if (rtp_socket == NULL) {
563     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
564         G_SOCKET_PROTOCOL_UDP, NULL);
565     if (!rtp_socket)
566       goto no_udp_protocol;
567   }
568
569   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
570     GstRTSPAddressFlags flags;
571
572     if (addr)
573       rejected_addresses = g_list_prepend (rejected_addresses, addr);
574
575     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
576     if (family == G_SOCKET_FAMILY_IPV6)
577       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
578     else
579       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
580
581     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
582
583     if (addr == NULL)
584       goto no_ports;
585
586     tmp_rtp = addr->port;
587
588     g_clear_object (&inetaddr);
589     inetaddr = g_inet_address_new_from_string (addr->address);
590   } else {
591     if (tmp_rtp != 0) {
592       tmp_rtp += 2;
593       if (++count > 20)
594         goto no_ports;
595     }
596
597     if (inetaddr == NULL)
598       inetaddr = g_inet_address_new_any (family);
599   }
600
601   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
602   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
603     g_object_unref (rtp_sockaddr);
604     goto again;
605   }
606   g_object_unref (rtp_sockaddr);
607
608   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
609   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
610     g_clear_object (&rtp_sockaddr);
611     goto socket_error;
612   }
613
614   tmp_rtp =
615       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
616   g_object_unref (rtp_sockaddr);
617
618   /* check if port is even */
619   if ((tmp_rtp & 1) != 0) {
620     /* port not even, close and allocate another */
621     tmp_rtp++;
622     g_clear_object (&rtp_socket);
623     goto again;
624   }
625
626   /* set port */
627   tmp_rtcp = tmp_rtp + 1;
628
629   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
630   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
631     g_object_unref (rtcp_sockaddr);
632     g_clear_object (&rtp_socket);
633     goto again;
634   }
635   g_object_unref (rtcp_sockaddr);
636
637   g_clear_object (&inetaddr);
638
639   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
640   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
641
642   if (udpsrc0 == NULL || udpsrc1 == NULL)
643     goto no_udp_protocol;
644
645   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
646   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
647
648   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
649   if (ret == GST_STATE_CHANGE_FAILURE)
650     goto element_error;
651   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
652   if (ret == GST_STATE_CHANGE_FAILURE)
653     goto element_error;
654
655   /* all fine, do port check */
656   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
657   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
658
659   /* this should not happen... */
660   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
661     goto port_error;
662
663   if (udpsink_out[0])
664     udpsink0 = udpsink_out[0];
665   else
666     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
667
668   if (!udpsink0)
669     goto no_udp_protocol;
670
671   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
672   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
673
674   if (udpsink_out[1])
675     udpsink1 = udpsink_out[1];
676   else
677     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
678
679   if (!udpsink1)
680     goto no_udp_protocol;
681
682   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
683   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
684   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
685
686   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
687   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
688   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
689   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
690   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
691   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
692   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
693   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
694
695   /* we keep these elements, we will further configure them when the
696    * client told us to really use the UDP ports. */
697   udpsrc_out[0] = udpsrc0;
698   udpsrc_out[1] = udpsrc1;
699   udpsink_out[0] = udpsink0;
700   udpsink_out[1] = udpsink1;
701   server_port_out->min = rtpport;
702   server_port_out->max = rtcpport;
703
704   *server_addr_out = addr;
705   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
706
707   g_object_unref (rtp_socket);
708   g_object_unref (rtcp_socket);
709
710   return TRUE;
711
712   /* ERRORS */
713 no_udp_protocol:
714   {
715     goto cleanup;
716   }
717 no_ports:
718   {
719     goto cleanup;
720   }
721 port_error:
722   {
723     goto cleanup;
724   }
725 socket_error:
726   {
727     goto cleanup;
728   }
729 element_error:
730   {
731     goto cleanup;
732   }
733 cleanup:
734   {
735     if (udpsrc0) {
736       gst_element_set_state (udpsrc0, GST_STATE_NULL);
737       gst_object_unref (udpsrc0);
738     }
739     if (udpsrc1) {
740       gst_element_set_state (udpsrc1, GST_STATE_NULL);
741       gst_object_unref (udpsrc1);
742     }
743     if (udpsink0) {
744       gst_element_set_state (udpsink0, GST_STATE_NULL);
745       gst_object_unref (udpsink0);
746     }
747     if (udpsink1) {
748       gst_element_set_state (udpsink1, GST_STATE_NULL);
749       gst_object_unref (udpsink1);
750     }
751     if (inetaddr)
752       g_object_unref (inetaddr);
753     g_list_free_full (rejected_addresses,
754         (GDestroyNotify) gst_rtsp_address_free);
755     if (addr)
756       gst_rtsp_address_free (addr);
757     if (rtp_socket)
758       g_object_unref (rtp_socket);
759     if (rtcp_socket)
760       g_object_unref (rtcp_socket);
761     return FALSE;
762   }
763 }
764
765 /* must be called with lock */
766 static gboolean
767 alloc_ports (GstRTSPStream * stream)
768 {
769   GstRTSPStreamPrivate *priv = stream->priv;
770
771   return alloc_ports_one_family (priv->pool, priv->buffer_size,
772       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
773       &priv->server_port_v4, &priv->server_addr_v4) &&
774       alloc_ports_one_family (priv->pool, priv->buffer_size,
775       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
776       &priv->server_port_v6, &priv->server_addr_v6);
777 }
778
779 /**
780  * gst_rtsp_stream_get_server_port:
781  * @stream: a #GstRTSPStream
782  * @server_port: (out): result server port
783  *
784  * Fill @server_port with the port pair used by the server. This function can
785  * only be called when @stream has been joined.
786  */
787 void
788 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
789     GstRTSPRange * server_port, GSocketFamily family)
790 {
791   GstRTSPStreamPrivate *priv;
792
793   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
794   priv = stream->priv;
795   g_return_if_fail (priv->is_joined);
796
797   g_mutex_lock (&priv->lock);
798   if (family == G_SOCKET_FAMILY_IPV4) {
799     if (server_port)
800       *server_port = priv->server_port_v4;
801   } else {
802     if (server_port)
803       *server_port = priv->server_port_v6;
804   }
805   g_mutex_unlock (&priv->lock);
806 }
807
808 /**
809  * gst_rtsp_stream_get_rtpsession:
810  * @stream: a #GstRTSPStream
811  *
812  * Get the RTP session of this stream.
813  *
814  * Returns: The RTP session of this stream. Unref after usage.
815  */
816 GObject *
817 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
818 {
819   GstRTSPStreamPrivate *priv;
820   GObject *session;
821
822   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
823
824   priv = stream->priv;
825
826   g_mutex_lock (&priv->lock);
827   if ((session = priv->session))
828     g_object_ref (session);
829   g_mutex_unlock (&priv->lock);
830
831   return session;
832 }
833
834 /**
835  * gst_rtsp_stream_get_ssrc:
836  * @stream: a #GstRTSPStream
837  * @ssrc: (out): result ssrc
838  *
839  * Get the SSRC used by the RTP session of this stream. This function can only
840  * be called when @stream has been joined.
841  */
842 void
843 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
844 {
845   GstRTSPStreamPrivate *priv;
846
847   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
848   priv = stream->priv;
849   g_return_if_fail (priv->is_joined);
850
851   g_mutex_lock (&priv->lock);
852   if (ssrc && priv->session)
853     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
854   g_mutex_unlock (&priv->lock);
855 }
856
857 /* executed from streaming thread */
858 static void
859 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
860 {
861   GstRTSPStreamPrivate *priv = stream->priv;
862   GstCaps *newcaps, *oldcaps;
863
864   newcaps = gst_pad_get_current_caps (pad);
865
866   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
867       newcaps);
868
869   g_mutex_lock (&priv->lock);
870   oldcaps = priv->caps;
871   priv->caps = newcaps;
872   g_mutex_unlock (&priv->lock);
873
874   if (oldcaps)
875     gst_caps_unref (oldcaps);
876 }
877
878 static void
879 dump_structure (const GstStructure * s)
880 {
881   gchar *sstr;
882
883   sstr = gst_structure_to_string (s);
884   GST_INFO ("structure: %s", sstr);
885   g_free (sstr);
886 }
887
888 static GstRTSPStreamTransport *
889 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
890 {
891   GstRTSPStreamPrivate *priv = stream->priv;
892   GList *walk;
893   GstRTSPStreamTransport *result = NULL;
894   const gchar *tmp;
895   gchar *dest;
896   guint port;
897
898   if (rtcp_from == NULL)
899     return NULL;
900
901   tmp = g_strrstr (rtcp_from, ":");
902   if (tmp == NULL)
903     return NULL;
904
905   port = atoi (tmp + 1);
906   dest = g_strndup (rtcp_from, tmp - rtcp_from);
907
908   g_mutex_lock (&priv->lock);
909   GST_INFO ("finding %s:%d in %d transports", dest, port,
910       g_list_length (priv->transports));
911
912   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
913     GstRTSPStreamTransport *trans = walk->data;
914     const GstRTSPTransport *tr;
915     gint min, max;
916
917     tr = gst_rtsp_stream_transport_get_transport (trans);
918
919     min = tr->client_port.min;
920     max = tr->client_port.max;
921
922     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
923       result = trans;
924       break;
925     }
926   }
927   if (result)
928     g_object_ref (result);
929   g_mutex_unlock (&priv->lock);
930
931   g_free (dest);
932
933   return result;
934 }
935
936 static GstRTSPStreamTransport *
937 check_transport (GObject * source, GstRTSPStream * stream)
938 {
939   GstStructure *stats;
940   GstRTSPStreamTransport *trans;
941
942   /* see if we have a stream to match with the origin of the RTCP packet */
943   trans = g_object_get_qdata (source, ssrc_stream_map_key);
944   if (trans == NULL) {
945     g_object_get (source, "stats", &stats, NULL);
946     if (stats) {
947       const gchar *rtcp_from;
948
949       dump_structure (stats);
950
951       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
952       if ((trans = find_transport (stream, rtcp_from))) {
953         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
954             source);
955         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
956             g_object_unref);
957       }
958       gst_structure_free (stats);
959     }
960   }
961   return trans;
962 }
963
964
965 static void
966 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
967 {
968   GstRTSPStreamTransport *trans;
969
970   GST_INFO ("%p: new source %p", stream, source);
971
972   trans = check_transport (source, stream);
973
974   if (trans)
975     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
976 }
977
978 static void
979 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
980 {
981   GST_INFO ("%p: new SDES %p", stream, source);
982 }
983
984 static void
985 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
986 {
987   GstRTSPStreamTransport *trans;
988
989   trans = check_transport (source, stream);
990
991   if (trans) {
992     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
993     gst_rtsp_stream_transport_keep_alive (trans);
994   }
995 #ifdef DUMP_STATS
996   {
997     GstStructure *stats;
998     g_object_get (source, "stats", &stats, NULL);
999     if (stats) {
1000       dump_structure (stats);
1001       gst_structure_free (stats);
1002     }
1003   }
1004 #endif
1005 }
1006
1007 static void
1008 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1009 {
1010   GST_INFO ("%p: source %p bye", stream, source);
1011 }
1012
1013 static void
1014 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1015 {
1016   GstRTSPStreamTransport *trans;
1017
1018   GST_INFO ("%p: source %p bye timeout", stream, source);
1019
1020   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1021     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1022     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1023   }
1024 }
1025
1026 static void
1027 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1028 {
1029   GstRTSPStreamTransport *trans;
1030
1031   GST_INFO ("%p: source %p timeout", stream, source);
1032
1033   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1034     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1035     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1036   }
1037 }
1038
1039 static GstFlowReturn
1040 handle_new_sample (GstAppSink * sink, gpointer user_data)
1041 {
1042   GstRTSPStreamPrivate *priv;
1043   GList *walk;
1044   GstSample *sample;
1045   GstBuffer *buffer;
1046   GstRTSPStream *stream;
1047
1048   sample = gst_app_sink_pull_sample (sink);
1049   if (!sample)
1050     return GST_FLOW_OK;
1051
1052   stream = (GstRTSPStream *) user_data;
1053   priv = stream->priv;
1054   buffer = gst_sample_get_buffer (sample);
1055
1056   g_mutex_lock (&priv->lock);
1057   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1058     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1059
1060     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1061       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1062     } else {
1063       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1064     }
1065   }
1066   g_mutex_unlock (&priv->lock);
1067
1068   gst_sample_unref (sample);
1069
1070   return GST_FLOW_OK;
1071 }
1072
1073 static GstAppSinkCallbacks sink_cb = {
1074   NULL,                         /* not interested in EOS */
1075   NULL,                         /* not interested in preroll samples */
1076   handle_new_sample,
1077 };
1078
1079 /**
1080  * gst_rtsp_stream_join_bin:
1081  * @stream: a #GstRTSPStream
1082  * @bin: a #GstBin to join
1083  * @rtpbin: a rtpbin element in @bin
1084  * @state: the target state of the new elements
1085  *
1086  * Join the #Gstbin @bin that contains the element @rtpbin.
1087  *
1088  * @stream will link to @rtpbin, which must be inside @bin. The elements
1089  * added to @bin will be set to the state given in @state.
1090  *
1091  * Returns: %TRUE on success.
1092  */
1093 gboolean
1094 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1095     GstElement * rtpbin, GstState state)
1096 {
1097   GstRTSPStreamPrivate *priv;
1098   gint i, idx;
1099   gchar *name;
1100   GstPad *pad, *teepad, *queuepad, *selpad;
1101   GstPadLinkReturn ret;
1102
1103   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1104   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1105   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1106
1107   priv = stream->priv;
1108
1109   g_mutex_lock (&priv->lock);
1110   if (priv->is_joined)
1111     goto was_joined;
1112
1113   /* create a session with the same index as the stream */
1114   idx = priv->idx;
1115
1116   GST_INFO ("stream %p joining bin as session %d", stream, idx);
1117
1118   if (!alloc_ports (stream))
1119     goto no_ports;
1120
1121   /* update the dscp qos field in the sinks */
1122   update_dscp_qos (stream);
1123
1124   /* get a pad for sending RTP */
1125   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1126   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1127   g_free (name);
1128   /* link the RTP pad to the session manager, it should not really fail unless
1129    * this is not really an RTP pad */
1130   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1131   if (ret != GST_PAD_LINK_OK)
1132     goto link_failed;
1133
1134   /* get pads from the RTP session element for sending and receiving
1135    * RTP/RTCP*/
1136   name = g_strdup_printf ("send_rtp_src_%u", idx);
1137   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1138   g_free (name);
1139   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1140   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1141   g_free (name);
1142   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1143   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1144   g_free (name);
1145   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1146   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1147   g_free (name);
1148
1149   /* get the session */
1150   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1151
1152   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1153       stream);
1154   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1155       stream);
1156   g_signal_connect (priv->session, "on-ssrc-active",
1157       (GCallback) on_ssrc_active, stream);
1158   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1159       stream);
1160   g_signal_connect (priv->session, "on-bye-timeout",
1161       (GCallback) on_bye_timeout, stream);
1162   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1163       stream);
1164
1165   for (i = 0; i < 2; i++) {
1166     /* For the sender we create this bit of pipeline for both
1167      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1168      * we need to add a queue before appsink to make the pipeline
1169      * not block. For the TCP case, we want to pump data to the
1170      * client as fast as possible anyway.
1171      *
1172      * .--------.      .-----.    .---------.
1173      * | rtpbin |      | tee |    | udpsink |
1174      * |       send->sink   src->sink       |
1175      * '--------'      |     |    '---------'
1176      *                 |     |    .---------.    .---------.
1177      *                 |     |    |  queue  |    | appsink |
1178      *                 |    src->sink      src->sink       |
1179      *                 '-----'    '---------'    '---------'
1180      */
1181     /* make tee for RTP/RTCP */
1182     priv->tee[i] = gst_element_factory_make ("tee", NULL);
1183     gst_bin_add (bin, priv->tee[i]);
1184
1185     /* and link to rtpbin send pad */
1186     pad = gst_element_get_static_pad (priv->tee[i], "sink");
1187     gst_pad_link (priv->send_src[i], pad);
1188     gst_object_unref (pad);
1189
1190     /* add udpsink */
1191     gst_bin_add (bin, priv->udpsink[i]);
1192
1193     /* link tee to udpsink */
1194     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1195     pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1196     gst_pad_link (teepad, pad);
1197     gst_object_unref (pad);
1198     gst_object_unref (teepad);
1199
1200     /* make queue */
1201     priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1202     gst_bin_add (bin, priv->appqueue[i]);
1203     /* and link to tee */
1204     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1205     pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1206     gst_pad_link (teepad, pad);
1207     gst_object_unref (pad);
1208     gst_object_unref (teepad);
1209
1210     /* make appsink */
1211     priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1212     g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1213     g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1214     gst_bin_add (bin, priv->appsink[i]);
1215     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1216         &sink_cb, stream, NULL);
1217     /* and link to queue */
1218     queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1219     pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1220     gst_pad_link (queuepad, pad);
1221     gst_object_unref (pad);
1222     gst_object_unref (queuepad);
1223
1224     /* For the receiver we create this bit of pipeline for both
1225      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1226      * and it is all funneled into the rtpbin receive pad.
1227      *
1228      * .--------.     .--------.    .--------.
1229      * | udpsrc |     | funnel |    | rtpbin |
1230      * |       src->sink      src->sink      |
1231      * '--------'     |        |    '--------'
1232      * .--------.     |        |
1233      * | appsrc |     |        |
1234      * |       src->sink       |
1235      * '--------'     '--------'
1236      */
1237     /* make funnel for the RTP/RTCP receivers */
1238     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1239     gst_bin_add (bin, priv->funnel[i]);
1240
1241     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1242     gst_pad_link (pad, priv->recv_sink[i]);
1243     gst_object_unref (pad);
1244
1245     /* we set and keep these to playing so that they don't cause NO_PREROLL return
1246      * values */
1247     gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1248     gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1249     gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1250     gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1251     /* add udpsrc */
1252     gst_bin_add (bin, priv->udpsrc_v4[i]);
1253     gst_bin_add (bin, priv->udpsrc_v6[i]);
1254     /* and link to the funnel v4 */
1255     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1256     pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1257     gst_pad_link (pad, selpad);
1258     gst_object_unref (pad);
1259     gst_object_unref (selpad);
1260
1261     /* and link to the funnel v6 */
1262     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1263     pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1264     gst_pad_link (pad, selpad);
1265     gst_object_unref (pad);
1266     gst_object_unref (selpad);
1267
1268     /* make and add appsrc */
1269     priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1270     gst_bin_add (bin, priv->appsrc[i]);
1271     /* and link to the funnel */
1272     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1273     pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1274     gst_pad_link (pad, selpad);
1275     gst_object_unref (pad);
1276     gst_object_unref (selpad);
1277
1278     /* check if we need to set to a special state */
1279     if (state != GST_STATE_NULL) {
1280       gst_element_set_state (priv->udpsink[i], state);
1281       gst_element_set_state (priv->appsink[i], state);
1282       gst_element_set_state (priv->appqueue[i], state);
1283       gst_element_set_state (priv->tee[i], state);
1284       gst_element_set_state (priv->funnel[i], state);
1285       gst_element_set_state (priv->appsrc[i], state);
1286     }
1287   }
1288
1289   /* be notified of caps changes */
1290   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1291       (GCallback) caps_notify, stream);
1292
1293   priv->is_joined = TRUE;
1294   g_mutex_unlock (&priv->lock);
1295
1296   return TRUE;
1297
1298   /* ERRORS */
1299 was_joined:
1300   {
1301     g_mutex_unlock (&priv->lock);
1302     return TRUE;
1303   }
1304 no_ports:
1305   {
1306     g_mutex_unlock (&priv->lock);
1307     GST_WARNING ("failed to allocate ports %d", idx);
1308     return FALSE;
1309   }
1310 link_failed:
1311   {
1312     GST_WARNING ("failed to link stream %d", idx);
1313     gst_object_unref (priv->send_rtp_sink);
1314     priv->send_rtp_sink = NULL;
1315     g_mutex_unlock (&priv->lock);
1316     return FALSE;
1317   }
1318 }
1319
1320 /**
1321  * gst_rtsp_stream_leave_bin:
1322  * @stream: a #GstRTSPStream
1323  * @bin: a #GstBin
1324  * @rtpbin: a rtpbin #GstElement
1325  *
1326  * Remove the elements of @stream from @bin.
1327  *
1328  * Return: %TRUE on success.
1329  */
1330 gboolean
1331 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1332     GstElement * rtpbin)
1333 {
1334   GstRTSPStreamPrivate *priv;
1335   gint i;
1336
1337   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1338   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1339   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1340
1341   priv = stream->priv;
1342
1343   g_mutex_lock (&priv->lock);
1344   if (!priv->is_joined)
1345     goto was_not_joined;
1346
1347   /* all transports must be removed by now */
1348   g_return_val_if_fail (priv->transports == NULL, FALSE);
1349
1350   GST_INFO ("stream %p leaving bin", stream);
1351
1352   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1353   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1354   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1355   gst_object_unref (priv->send_rtp_sink);
1356   priv->send_rtp_sink = NULL;
1357
1358   for (i = 0; i < 2; i++) {
1359     gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1360     gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1361     gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1362     gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1363     gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1364     gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1365     /* and set udpsrc to NULL now before removing */
1366     gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1367     gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1368     gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1369     gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1370
1371     /* removing them should also nicely release the request
1372      * pads when they finalize */
1373     gst_bin_remove (bin, priv->udpsrc_v4[i]);
1374     gst_bin_remove (bin, priv->udpsrc_v6[i]);
1375     gst_bin_remove (bin, priv->udpsink[i]);
1376     gst_bin_remove (bin, priv->appsrc[i]);
1377     gst_bin_remove (bin, priv->appsink[i]);
1378     gst_bin_remove (bin, priv->appqueue[i]);
1379     gst_bin_remove (bin, priv->tee[i]);
1380     gst_bin_remove (bin, priv->funnel[i]);
1381
1382     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1383     gst_object_unref (priv->recv_sink[i]);
1384     priv->recv_sink[i] = NULL;
1385
1386     priv->udpsrc_v4[i] = NULL;
1387     priv->udpsrc_v6[i] = NULL;
1388     priv->udpsink[i] = NULL;
1389     priv->appsrc[i] = NULL;
1390     priv->appsink[i] = NULL;
1391     priv->appqueue[i] = NULL;
1392     priv->tee[i] = NULL;
1393     priv->funnel[i] = NULL;
1394   }
1395   gst_object_unref (priv->send_src[0]);
1396   priv->send_src[0] = NULL;
1397
1398   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1399   gst_object_unref (priv->send_src[1]);
1400   priv->send_src[1] = NULL;
1401
1402   g_object_unref (priv->session);
1403   priv->session = NULL;
1404   if (priv->caps)
1405     gst_caps_unref (priv->caps);
1406   priv->caps = NULL;
1407
1408   priv->is_joined = FALSE;
1409   g_mutex_unlock (&priv->lock);
1410
1411   return TRUE;
1412
1413 was_not_joined:
1414   {
1415     return TRUE;
1416   }
1417 }
1418
1419 /**
1420  * gst_rtsp_stream_get_rtpinfo:
1421  * @stream: a #GstRTSPStream
1422  * @rtptime: result RTP timestamp
1423  * @seq: result RTP seqnum
1424  *
1425  * Retrieve the current rtptime and seq. This is used to
1426  * construct a RTPInfo reply header.
1427  *
1428  * Returns: %TRUE when rtptime and seq could be determined.
1429  */
1430 gboolean
1431 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1432     guint * rtptime, guint * seq)
1433 {
1434   GstRTSPStreamPrivate *priv;
1435   GObjectClass *payobjclass;
1436
1437   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1438   g_return_val_if_fail (rtptime != NULL, FALSE);
1439   g_return_val_if_fail (seq != NULL, FALSE);
1440
1441   priv = stream->priv;
1442
1443   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1444
1445   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1446       !g_object_class_find_property (payobjclass, "timestamp"))
1447     return FALSE;
1448
1449   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1450
1451   return TRUE;
1452 }
1453
1454 /**
1455  * gst_rtsp_stream_get_caps:
1456  * @stream: a #GstRTSPStream
1457  *
1458  * Retrieve the current caps of @stream.
1459  *
1460  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1461  *    after usage.
1462  */
1463 GstCaps *
1464 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1465 {
1466   GstRTSPStreamPrivate *priv;
1467   GstCaps *result;
1468
1469   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1470
1471   priv = stream->priv;
1472
1473   g_mutex_lock (&priv->lock);
1474   if ((result = priv->caps))
1475     gst_caps_ref (result);
1476   g_mutex_unlock (&priv->lock);
1477
1478   return result;
1479 }
1480
1481 /**
1482  * gst_rtsp_stream_recv_rtp:
1483  * @stream: a #GstRTSPStream
1484  * @buffer: (transfer full): a #GstBuffer
1485  *
1486  * Handle an RTP buffer for the stream. This method is usually called when a
1487  * message has been received from a client using the TCP transport.
1488  *
1489  * This function takes ownership of @buffer.
1490  *
1491  * Returns: a GstFlowReturn.
1492  */
1493 GstFlowReturn
1494 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1495 {
1496   GstRTSPStreamPrivate *priv;
1497   GstFlowReturn ret;
1498   GstElement *element;
1499
1500   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1501   priv = stream->priv;
1502   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1503   g_return_val_if_fail (priv->is_joined, FALSE);
1504
1505   g_mutex_lock (&priv->lock);
1506   element = gst_object_ref (priv->appsrc[0]);
1507   g_mutex_unlock (&priv->lock);
1508
1509   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1510
1511   gst_object_unref (element);
1512
1513   return ret;
1514 }
1515
1516 /**
1517  * gst_rtsp_stream_recv_rtcp:
1518  * @stream: a #GstRTSPStream
1519  * @buffer: (transfer full): a #GstBuffer
1520  *
1521  * Handle an RTCP buffer for the stream. This method is usually called when a
1522  * message has been received from a client using the TCP transport.
1523  *
1524  * This function takes ownership of @buffer.
1525  *
1526  * Returns: a GstFlowReturn.
1527  */
1528 GstFlowReturn
1529 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1530 {
1531   GstRTSPStreamPrivate *priv;
1532   GstFlowReturn ret;
1533   GstElement *element;
1534
1535   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1536   priv = stream->priv;
1537   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1538   g_return_val_if_fail (priv->is_joined, FALSE);
1539
1540   g_mutex_lock (&priv->lock);
1541   element = gst_object_ref (priv->appsrc[1]);
1542   g_mutex_unlock (&priv->lock);
1543
1544   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1545
1546   gst_object_unref (element);
1547
1548   return ret;
1549 }
1550
1551 /* must be called with lock */
1552 static gboolean
1553 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1554     gboolean add)
1555 {
1556   GstRTSPStreamPrivate *priv = stream->priv;
1557   const GstRTSPTransport *tr;
1558
1559   tr = gst_rtsp_stream_transport_get_transport (trans);
1560
1561   switch (tr->lower_transport) {
1562     case GST_RTSP_LOWER_TRANS_UDP:
1563     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1564     {
1565       gchar *dest;
1566       gint min, max;
1567       guint ttl = 0;
1568
1569       dest = tr->destination;
1570       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1571         min = tr->port.min;
1572         max = tr->port.max;
1573         ttl = tr->ttl;
1574       } else {
1575         min = tr->client_port.min;
1576         max = tr->client_port.max;
1577       }
1578
1579       if (add) {
1580         GST_INFO ("adding %s:%d-%d", dest, min, max);
1581         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1582         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1583         if (ttl > 0) {
1584           GST_INFO ("setting ttl-mc %d", ttl);
1585           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1586           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1587         }
1588         priv->transports = g_list_prepend (priv->transports, trans);
1589       } else {
1590         GST_INFO ("removing %s:%d-%d", dest, min, max);
1591         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1592         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1593         priv->transports = g_list_remove (priv->transports, trans);
1594       }
1595       break;
1596     }
1597     case GST_RTSP_LOWER_TRANS_TCP:
1598       if (add) {
1599         GST_INFO ("adding TCP %s", tr->destination);
1600         priv->transports = g_list_prepend (priv->transports, trans);
1601       } else {
1602         GST_INFO ("removing TCP %s", tr->destination);
1603         priv->transports = g_list_remove (priv->transports, trans);
1604       }
1605       break;
1606     default:
1607       goto unknown_transport;
1608   }
1609   return TRUE;
1610
1611   /* ERRORS */
1612 unknown_transport:
1613   {
1614     GST_INFO ("Unknown transport %d", tr->lower_transport);
1615     return FALSE;
1616   }
1617 }
1618
1619
1620 /**
1621  * gst_rtsp_stream_add_transport:
1622  * @stream: a #GstRTSPStream
1623  * @trans: a #GstRTSPStreamTransport
1624  *
1625  * Add the transport in @trans to @stream. The media of @stream will
1626  * then also be send to the values configured in @trans.
1627  *
1628  * @stream must be joined to a bin.
1629  *
1630  * @trans must contain a valid #GstRTSPTransport.
1631  *
1632  * Returns: %TRUE if @trans was added
1633  */
1634 gboolean
1635 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1636     GstRTSPStreamTransport * trans)
1637 {
1638   GstRTSPStreamPrivate *priv;
1639   gboolean res;
1640
1641   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1642   priv = stream->priv;
1643   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1644   g_return_val_if_fail (priv->is_joined, FALSE);
1645
1646   g_mutex_lock (&priv->lock);
1647   res = update_transport (stream, trans, TRUE);
1648   g_mutex_unlock (&priv->lock);
1649
1650   return res;
1651 }
1652
1653 /**
1654  * gst_rtsp_stream_remove_transport:
1655  * @stream: a #GstRTSPStream
1656  * @trans: a #GstRTSPStreamTransport
1657  *
1658  * Remove the transport in @trans from @stream. The media of @stream will
1659  * not be sent to the values configured in @trans.
1660  *
1661  * @stream must be joined to a bin.
1662  *
1663  * @trans must contain a valid #GstRTSPTransport.
1664  *
1665  * Returns: %TRUE if @trans was removed
1666  */
1667 gboolean
1668 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1669     GstRTSPStreamTransport * trans)
1670 {
1671   GstRTSPStreamPrivate *priv;
1672   gboolean res;
1673
1674   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1675   priv = stream->priv;
1676   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1677   g_return_val_if_fail (priv->is_joined, FALSE);
1678
1679   g_mutex_lock (&priv->lock);
1680   res = update_transport (stream, trans, FALSE);
1681   g_mutex_unlock (&priv->lock);
1682
1683   return res;
1684 }