stream: handle failed port allocation
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
31      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
32
33 struct _GstRTSPStreamPrivate
34 {
35   GMutex lock;
36   guint idx;
37   GstPad *srcpad;
38   GstElement *payloader;
39   guint buffer_size;
40   gboolean is_joined;
41
42   /* pads on the rtpbin */
43   GstPad *send_rtp_sink;
44   GstPad *recv_sink[2];
45   GstPad *send_src[2];
46
47   /* the RTPSession object */
48   GObject *session;
49
50   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
51    * sockets */
52   GstElement *udpsrc_v4[2];
53
54   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
55    * sockets */
56   GstElement *udpsrc_v6[2];
57
58   GstElement *udpsink[2];
59
60   /* for TCP transport */
61   GstElement *appsrc[2];
62   GstElement *appqueue[2];
63   GstElement *appsink[2];
64
65   GstElement *tee[2];
66   GstElement *funnel[2];
67
68   /* server ports for sending/receiving over ipv4 */
69   GstRTSPRange server_port_v4;
70   GstRTSPAddress *server_addr_v4;
71   gboolean have_ipv4;
72
73   /* server ports for sending/receiving over ipv6 */
74   GstRTSPRange server_port_v6;
75   GstRTSPAddress *server_addr_v6;
76   gboolean have_ipv6;
77
78   /* multicast addresses */
79   GstRTSPAddressPool *pool;
80   GstRTSPAddress *addr;
81
82   /* the caps of the stream */
83   gulong caps_sig;
84   GstCaps *caps;
85
86   /* transports we stream to */
87   guint n_active;
88   GList *transports;
89
90   gint dscp_qos;
91 };
92
93
94 enum
95 {
96   PROP_0,
97   PROP_LAST
98 };
99
100 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
101 #define GST_CAT_DEFAULT rtsp_stream_debug
102
103 static GQuark ssrc_stream_map_key;
104
105 static void gst_rtsp_stream_finalize (GObject * obj);
106
107 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
108
109 static void
110 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
111 {
112   GObjectClass *gobject_class;
113
114   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
115
116   gobject_class = G_OBJECT_CLASS (klass);
117
118   gobject_class->finalize = gst_rtsp_stream_finalize;
119
120   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
121
122   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
123 }
124
125 static void
126 gst_rtsp_stream_init (GstRTSPStream * stream)
127 {
128   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
129
130   GST_DEBUG ("new stream %p", stream);
131
132   stream->priv = priv;
133
134   stream->priv->dscp_qos = -1;
135
136   g_mutex_init (&priv->lock);
137 }
138
139 static void
140 gst_rtsp_stream_finalize (GObject * obj)
141 {
142   GstRTSPStream *stream;
143   GstRTSPStreamPrivate *priv;
144
145   stream = GST_RTSP_STREAM (obj);
146   priv = stream->priv;
147
148   GST_DEBUG ("finalize stream %p", stream);
149
150   /* we really need to be unjoined now */
151   g_return_if_fail (!priv->is_joined);
152
153   if (priv->addr)
154     gst_rtsp_address_free (priv->addr);
155   if (priv->server_addr_v4)
156     gst_rtsp_address_free (priv->server_addr_v4);
157   if (priv->server_addr_v6)
158     gst_rtsp_address_free (priv->server_addr_v6);
159   if (priv->pool)
160     g_object_unref (priv->pool);
161   gst_object_unref (priv->payloader);
162   gst_object_unref (priv->srcpad);
163   g_mutex_clear (&priv->lock);
164
165   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
166 }
167
168 /**
169  * gst_rtsp_stream_new:
170  * @idx: an index
171  * @srcpad: a #GstPad
172  * @payloader: a #GstElement
173  *
174  * Create a new media stream with index @idx that handles RTP data on
175  * @srcpad and has a payloader element @payloader.
176  *
177  * Returns: a new #GstRTSPStream
178  */
179 GstRTSPStream *
180 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
181 {
182   GstRTSPStreamPrivate *priv;
183   GstRTSPStream *stream;
184
185   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
186   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
187   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
188
189   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
190   priv = stream->priv;
191   priv->idx = idx;
192   priv->payloader = gst_object_ref (payloader);
193   priv->srcpad = gst_object_ref (srcpad);
194
195   return stream;
196 }
197
198 /**
199  * gst_rtsp_stream_get_index:
200  * @stream: a #GstRTSPStream
201  *
202  * Get the stream index.
203  *
204  * Return: the stream index.
205  */
206 guint
207 gst_rtsp_stream_get_index (GstRTSPStream * stream)
208 {
209   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
210
211   return stream->priv->idx;
212 }
213
214  /**
215  * gst_rtsp_stream_get_srcpad:
216  * @stream: a #GstRTSPStream
217  *
218  * Get the srcpad associated with @stream.
219  *
220  * Return: the srcpad. Unref after usage.
221  */
222 GstPad *
223 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
224 {
225   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
226
227   return gst_object_ref (stream->priv->srcpad);
228 }
229
230 /**
231  * gst_rtsp_stream_set_mtu:
232  * @stream: a #GstRTSPStream
233  * @mtu: a new MTU
234  *
235  * Configure the mtu in the payloader of @stream to @mtu.
236  */
237 void
238 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
239 {
240   GstRTSPStreamPrivate *priv;
241
242   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
243
244   priv = stream->priv;
245
246   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
247
248   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
249 }
250
251 /**
252  * gst_rtsp_stream_get_mtu:
253  * @stream: a #GstRTSPStream
254  *
255  * Get the configured MTU in the payloader of @stream.
256  *
257  * Returns: the MTU of the payloader.
258  */
259 guint
260 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
261 {
262   GstRTSPStreamPrivate *priv;
263   guint mtu;
264
265   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
266
267   priv = stream->priv;
268
269   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
270
271   return mtu;
272 }
273
274 /* Update the dscp qos property on the udp sinks */
275 static void
276 update_dscp_qos (GstRTSPStream * stream)
277 {
278   GstRTSPStreamPrivate *priv;
279
280   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
281
282   priv = stream->priv;
283
284   if (priv->udpsink[0]) {
285     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
286         NULL);
287   }
288
289   if (priv->udpsink[1]) {
290     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
291         NULL);
292   }
293 }
294
295 /**
296  * gst_rtsp_stream_set_dscp_qos:
297  * @stream: a #GstRTSPStream
298  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
299  *
300  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
301  */
302 void
303 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
304 {
305   GstRTSPStreamPrivate *priv;
306
307   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
308
309   priv = stream->priv;
310
311   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
312
313   if (dscp_qos < -1 || dscp_qos > 63) {
314     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
315     return;
316   }
317
318   priv->dscp_qos = dscp_qos;
319
320   update_dscp_qos (stream);
321 }
322
323 /**
324  * gst_rtsp_stream_get_dscp_qos:
325  * @stream: a #GstRTSPStream
326  *
327  * Get the configured DSCP QoS in of the outgoing sockets.
328  *
329  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
330  */
331 gint
332 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
333 {
334   GstRTSPStreamPrivate *priv;
335
336   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
337
338   priv = stream->priv;
339
340   return priv->dscp_qos;
341 }
342
343
344 /**
345  * gst_rtsp_stream_set_address_pool:
346  * @stream: a #GstRTSPStream
347  * @pool: a #GstRTSPAddressPool
348  *
349  * configure @pool to be used as the address pool of @stream.
350  */
351 void
352 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
353     GstRTSPAddressPool * pool)
354 {
355   GstRTSPStreamPrivate *priv;
356   GstRTSPAddressPool *old;
357
358   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
359
360   priv = stream->priv;
361
362   GST_LOG_OBJECT (stream, "set address pool %p", pool);
363
364   g_mutex_lock (&priv->lock);
365   if ((old = priv->pool) != pool)
366     priv->pool = pool ? g_object_ref (pool) : NULL;
367   else
368     old = NULL;
369   g_mutex_unlock (&priv->lock);
370
371   if (old)
372     g_object_unref (old);
373 }
374
375 /**
376  * gst_rtsp_stream_get_address_pool:
377  * @stream: a #GstRTSPStream
378  *
379  * Get the #GstRTSPAddressPool used as the address pool of @stream.
380  *
381  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
382  * usage.
383  */
384 GstRTSPAddressPool *
385 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
386 {
387   GstRTSPStreamPrivate *priv;
388   GstRTSPAddressPool *result;
389
390   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
391
392   priv = stream->priv;
393
394   g_mutex_lock (&priv->lock);
395   if ((result = priv->pool))
396     g_object_ref (result);
397   g_mutex_unlock (&priv->lock);
398
399   return result;
400 }
401
402 /**
403  * gst_rtsp_stream_get_address:
404  * @stream: a #GstRTSPStream
405  *
406  * Get the multicast address of @stream.
407  *
408  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
409  * allocated. gst_rtsp_address_free() after usage.
410  */
411 GstRTSPAddress *
412 gst_rtsp_stream_get_address (GstRTSPStream * stream)
413 {
414   GstRTSPStreamPrivate *priv;
415   GstRTSPAddress *result;
416
417   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
418
419   priv = stream->priv;
420
421   g_mutex_lock (&priv->lock);
422   if (priv->addr == NULL) {
423     if (priv->pool == NULL)
424       goto no_pool;
425
426     priv->addr = gst_rtsp_address_pool_acquire_address (priv->pool,
427         GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST, 2);
428     if (priv->addr == NULL)
429       goto no_address;
430   }
431   result = gst_rtsp_address_copy (priv->addr);
432   g_mutex_unlock (&priv->lock);
433
434   return result;
435
436   /* ERRORS */
437 no_pool:
438   {
439     GST_ERROR_OBJECT (stream, "no address pool specified");
440     g_mutex_unlock (&priv->lock);
441     return NULL;
442   }
443 no_address:
444   {
445     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
446     g_mutex_unlock (&priv->lock);
447     return NULL;
448   }
449 }
450
451 /**
452  * gst_rtsp_stream_reserve_address:
453  * @stream: a #GstRTSPStream
454  * @address: an address
455  * @port: a port
456  * @n_ports: n_ports
457  * @ttl: a TTL
458  *
459  * Reserve @address and @port as the address and port of @stream.
460  *
461  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
462  * reserved. gst_rtsp_address_free() after usage.
463  */
464 GstRTSPAddress *
465 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
466     const gchar * address, guint port, guint n_ports, guint ttl)
467 {
468   GstRTSPStreamPrivate *priv;
469   GstRTSPAddress *result;
470
471   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
472   g_return_val_if_fail (address != NULL, NULL);
473   g_return_val_if_fail (port > 0, NULL);
474   g_return_val_if_fail (n_ports > 0, NULL);
475   g_return_val_if_fail (ttl > 0, NULL);
476
477   priv = stream->priv;
478
479   g_mutex_lock (&priv->lock);
480   if (priv->addr == NULL) {
481     if (priv->pool == NULL)
482       goto no_pool;
483
484     priv->addr = gst_rtsp_address_pool_reserve_address (priv->pool, address,
485         port, n_ports, ttl);
486     if (priv->addr == NULL)
487       goto no_address;
488   } else {
489     if (strcmp (priv->addr->address, address) ||
490         priv->addr->port != port || priv->addr->n_ports != n_ports ||
491         priv->addr->ttl != ttl)
492       goto different_address;
493   }
494   result = gst_rtsp_address_copy (priv->addr);
495   g_mutex_unlock (&priv->lock);
496
497   return result;
498
499   /* ERRORS */
500 no_pool:
501   {
502     GST_ERROR_OBJECT (stream, "no address pool specified");
503     g_mutex_unlock (&priv->lock);
504     return NULL;
505   }
506 no_address:
507   {
508     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
509         address);
510     g_mutex_unlock (&priv->lock);
511     return NULL;
512   }
513 different_address:
514   {
515     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
516         " reserved", address);
517     g_mutex_unlock (&priv->lock);
518     return NULL;
519   }
520 }
521
522 static gboolean
523 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
524     GSocketFamily family, GstElement * udpsrc_out[2],
525     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
526     GstRTSPAddress ** server_addr_out)
527 {
528   GstStateChangeReturn ret;
529   GstElement *udpsrc0, *udpsrc1;
530   GstElement *udpsink0, *udpsink1;
531   GSocket *rtp_socket = NULL;
532   GSocket *rtcp_socket;
533   gint tmp_rtp, tmp_rtcp;
534   guint count;
535   gint rtpport, rtcpport;
536   GList *rejected_addresses = NULL;
537   GstRTSPAddress *addr = NULL;
538   GInetAddress *inetaddr = NULL;
539   GSocketAddress *rtp_sockaddr = NULL;
540   GSocketAddress *rtcp_sockaddr = NULL;
541   const gchar *multisink_socket;
542
543   if (family == G_SOCKET_FAMILY_IPV6)
544     multisink_socket = "socket-v6";
545   else
546     multisink_socket = "socket";
547
548   udpsrc0 = NULL;
549   udpsrc1 = NULL;
550   udpsink0 = NULL;
551   udpsink1 = NULL;
552   count = 0;
553
554   /* Start with random port */
555   tmp_rtp = 0;
556
557   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
558       G_SOCKET_PROTOCOL_UDP, NULL);
559   if (!rtcp_socket)
560     goto no_udp_protocol;
561
562   if (*server_addr_out)
563     gst_rtsp_address_free (*server_addr_out);
564
565   /* try to allocate 2 UDP ports, the RTP port should be an even
566    * number and the RTCP port should be the next (uneven) port */
567 again:
568
569   if (rtp_socket == NULL) {
570     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
571         G_SOCKET_PROTOCOL_UDP, NULL);
572     if (!rtp_socket)
573       goto no_udp_protocol;
574   }
575
576   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
577     GstRTSPAddressFlags flags;
578
579     if (addr)
580       rejected_addresses = g_list_prepend (rejected_addresses, addr);
581
582     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
583     if (family == G_SOCKET_FAMILY_IPV6)
584       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
585     else
586       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
587
588     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
589
590     if (addr == NULL)
591       goto no_ports;
592
593     tmp_rtp = addr->port;
594
595     g_clear_object (&inetaddr);
596     inetaddr = g_inet_address_new_from_string (addr->address);
597   } else {
598     if (tmp_rtp != 0) {
599       tmp_rtp += 2;
600       if (++count > 20)
601         goto no_ports;
602     }
603
604     if (inetaddr == NULL)
605       inetaddr = g_inet_address_new_any (family);
606   }
607
608   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
609   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
610     g_object_unref (rtp_sockaddr);
611     goto again;
612   }
613   g_object_unref (rtp_sockaddr);
614
615   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
616   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
617     g_clear_object (&rtp_sockaddr);
618     goto socket_error;
619   }
620
621   tmp_rtp =
622       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
623   g_object_unref (rtp_sockaddr);
624
625   /* check if port is even */
626   if ((tmp_rtp & 1) != 0) {
627     /* port not even, close and allocate another */
628     tmp_rtp++;
629     g_clear_object (&rtp_socket);
630     goto again;
631   }
632
633   /* set port */
634   tmp_rtcp = tmp_rtp + 1;
635
636   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
637   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
638     g_object_unref (rtcp_sockaddr);
639     g_clear_object (&rtp_socket);
640     goto again;
641   }
642   g_object_unref (rtcp_sockaddr);
643
644   g_clear_object (&inetaddr);
645
646   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
647   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
648
649   if (udpsrc0 == NULL || udpsrc1 == NULL)
650     goto no_udp_protocol;
651
652   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
653   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
654
655   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
656   if (ret == GST_STATE_CHANGE_FAILURE)
657     goto element_error;
658   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
659   if (ret == GST_STATE_CHANGE_FAILURE)
660     goto element_error;
661
662   /* all fine, do port check */
663   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
664   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
665
666   /* this should not happen... */
667   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
668     goto port_error;
669
670   if (udpsink_out[0])
671     udpsink0 = udpsink_out[0];
672   else
673     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
674
675   if (!udpsink0)
676     goto no_udp_protocol;
677
678   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
679   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
680
681   if (udpsink_out[1])
682     udpsink1 = udpsink_out[1];
683   else
684     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
685
686   if (!udpsink1)
687     goto no_udp_protocol;
688
689   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
690   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
691   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
692
693   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
694   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
695   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
696   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
697   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
698   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
699   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
700   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
701
702   /* we keep these elements, we will further configure them when the
703    * client told us to really use the UDP ports. */
704   udpsrc_out[0] = udpsrc0;
705   udpsrc_out[1] = udpsrc1;
706   udpsink_out[0] = udpsink0;
707   udpsink_out[1] = udpsink1;
708   server_port_out->min = rtpport;
709   server_port_out->max = rtcpport;
710
711   *server_addr_out = addr;
712   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
713
714   g_object_unref (rtp_socket);
715   g_object_unref (rtcp_socket);
716
717   return TRUE;
718
719   /* ERRORS */
720 no_udp_protocol:
721   {
722     goto cleanup;
723   }
724 no_ports:
725   {
726     goto cleanup;
727   }
728 port_error:
729   {
730     goto cleanup;
731   }
732 socket_error:
733   {
734     goto cleanup;
735   }
736 element_error:
737   {
738     goto cleanup;
739   }
740 cleanup:
741   {
742     if (udpsrc0) {
743       gst_element_set_state (udpsrc0, GST_STATE_NULL);
744       gst_object_unref (udpsrc0);
745     }
746     if (udpsrc1) {
747       gst_element_set_state (udpsrc1, GST_STATE_NULL);
748       gst_object_unref (udpsrc1);
749     }
750     if (udpsink0) {
751       gst_element_set_state (udpsink0, GST_STATE_NULL);
752       gst_object_unref (udpsink0);
753     }
754     if (udpsink1) {
755       gst_element_set_state (udpsink1, GST_STATE_NULL);
756       gst_object_unref (udpsink1);
757     }
758     if (inetaddr)
759       g_object_unref (inetaddr);
760     g_list_free_full (rejected_addresses,
761         (GDestroyNotify) gst_rtsp_address_free);
762     if (addr)
763       gst_rtsp_address_free (addr);
764     if (rtp_socket)
765       g_object_unref (rtp_socket);
766     if (rtcp_socket)
767       g_object_unref (rtcp_socket);
768     return FALSE;
769   }
770 }
771
772 /* must be called with lock */
773 static gboolean
774 alloc_ports (GstRTSPStream * stream)
775 {
776   GstRTSPStreamPrivate *priv = stream->priv;
777
778   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
779       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
780       &priv->server_port_v4, &priv->server_addr_v4);
781
782   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
783       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
784       &priv->server_port_v6, &priv->server_addr_v6);
785
786   return priv->have_ipv4 || priv->have_ipv6;
787 }
788
789 /**
790  * gst_rtsp_stream_get_server_port:
791  * @stream: a #GstRTSPStream
792  * @server_port: (out): result server port
793  *
794  * Fill @server_port with the port pair used by the server. This function can
795  * only be called when @stream has been joined.
796  */
797 void
798 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
799     GstRTSPRange * server_port, GSocketFamily family)
800 {
801   GstRTSPStreamPrivate *priv;
802
803   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
804   priv = stream->priv;
805   g_return_if_fail (priv->is_joined);
806
807   g_mutex_lock (&priv->lock);
808   if (family == G_SOCKET_FAMILY_IPV4) {
809     if (server_port)
810       *server_port = priv->server_port_v4;
811   } else {
812     if (server_port)
813       *server_port = priv->server_port_v6;
814   }
815   g_mutex_unlock (&priv->lock);
816 }
817
818 /**
819  * gst_rtsp_stream_get_rtpsession:
820  * @stream: a #GstRTSPStream
821  *
822  * Get the RTP session of this stream.
823  *
824  * Returns: The RTP session of this stream. Unref after usage.
825  */
826 GObject *
827 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
828 {
829   GstRTSPStreamPrivate *priv;
830   GObject *session;
831
832   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
833
834   priv = stream->priv;
835
836   g_mutex_lock (&priv->lock);
837   if ((session = priv->session))
838     g_object_ref (session);
839   g_mutex_unlock (&priv->lock);
840
841   return session;
842 }
843
844 /**
845  * gst_rtsp_stream_get_ssrc:
846  * @stream: a #GstRTSPStream
847  * @ssrc: (out): result ssrc
848  *
849  * Get the SSRC used by the RTP session of this stream. This function can only
850  * be called when @stream has been joined.
851  */
852 void
853 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
854 {
855   GstRTSPStreamPrivate *priv;
856
857   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
858   priv = stream->priv;
859   g_return_if_fail (priv->is_joined);
860
861   g_mutex_lock (&priv->lock);
862   if (ssrc && priv->session)
863     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
864   g_mutex_unlock (&priv->lock);
865 }
866
867 /* executed from streaming thread */
868 static void
869 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
870 {
871   GstRTSPStreamPrivate *priv = stream->priv;
872   GstCaps *newcaps, *oldcaps;
873
874   newcaps = gst_pad_get_current_caps (pad);
875
876   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
877       newcaps);
878
879   g_mutex_lock (&priv->lock);
880   oldcaps = priv->caps;
881   priv->caps = newcaps;
882   g_mutex_unlock (&priv->lock);
883
884   if (oldcaps)
885     gst_caps_unref (oldcaps);
886 }
887
888 static void
889 dump_structure (const GstStructure * s)
890 {
891   gchar *sstr;
892
893   sstr = gst_structure_to_string (s);
894   GST_INFO ("structure: %s", sstr);
895   g_free (sstr);
896 }
897
898 static GstRTSPStreamTransport *
899 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
900 {
901   GstRTSPStreamPrivate *priv = stream->priv;
902   GList *walk;
903   GstRTSPStreamTransport *result = NULL;
904   const gchar *tmp;
905   gchar *dest;
906   guint port;
907
908   if (rtcp_from == NULL)
909     return NULL;
910
911   tmp = g_strrstr (rtcp_from, ":");
912   if (tmp == NULL)
913     return NULL;
914
915   port = atoi (tmp + 1);
916   dest = g_strndup (rtcp_from, tmp - rtcp_from);
917
918   g_mutex_lock (&priv->lock);
919   GST_INFO ("finding %s:%d in %d transports", dest, port,
920       g_list_length (priv->transports));
921
922   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
923     GstRTSPStreamTransport *trans = walk->data;
924     const GstRTSPTransport *tr;
925     gint min, max;
926
927     tr = gst_rtsp_stream_transport_get_transport (trans);
928
929     min = tr->client_port.min;
930     max = tr->client_port.max;
931
932     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
933       result = trans;
934       break;
935     }
936   }
937   if (result)
938     g_object_ref (result);
939   g_mutex_unlock (&priv->lock);
940
941   g_free (dest);
942
943   return result;
944 }
945
946 static GstRTSPStreamTransport *
947 check_transport (GObject * source, GstRTSPStream * stream)
948 {
949   GstStructure *stats;
950   GstRTSPStreamTransport *trans;
951
952   /* see if we have a stream to match with the origin of the RTCP packet */
953   trans = g_object_get_qdata (source, ssrc_stream_map_key);
954   if (trans == NULL) {
955     g_object_get (source, "stats", &stats, NULL);
956     if (stats) {
957       const gchar *rtcp_from;
958
959       dump_structure (stats);
960
961       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
962       if ((trans = find_transport (stream, rtcp_from))) {
963         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
964             source);
965         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
966             g_object_unref);
967       }
968       gst_structure_free (stats);
969     }
970   }
971   return trans;
972 }
973
974
975 static void
976 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
977 {
978   GstRTSPStreamTransport *trans;
979
980   GST_INFO ("%p: new source %p", stream, source);
981
982   trans = check_transport (source, stream);
983
984   if (trans)
985     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
986 }
987
988 static void
989 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
990 {
991   GST_INFO ("%p: new SDES %p", stream, source);
992 }
993
994 static void
995 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
996 {
997   GstRTSPStreamTransport *trans;
998
999   trans = check_transport (source, stream);
1000
1001   if (trans) {
1002     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1003     gst_rtsp_stream_transport_keep_alive (trans);
1004   }
1005 #ifdef DUMP_STATS
1006   {
1007     GstStructure *stats;
1008     g_object_get (source, "stats", &stats, NULL);
1009     if (stats) {
1010       dump_structure (stats);
1011       gst_structure_free (stats);
1012     }
1013   }
1014 #endif
1015 }
1016
1017 static void
1018 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1019 {
1020   GST_INFO ("%p: source %p bye", stream, source);
1021 }
1022
1023 static void
1024 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1025 {
1026   GstRTSPStreamTransport *trans;
1027
1028   GST_INFO ("%p: source %p bye timeout", stream, source);
1029
1030   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1031     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1032     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1033   }
1034 }
1035
1036 static void
1037 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1038 {
1039   GstRTSPStreamTransport *trans;
1040
1041   GST_INFO ("%p: source %p timeout", stream, source);
1042
1043   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1044     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1045     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1046   }
1047 }
1048
1049 static GstFlowReturn
1050 handle_new_sample (GstAppSink * sink, gpointer user_data)
1051 {
1052   GstRTSPStreamPrivate *priv;
1053   GList *walk;
1054   GstSample *sample;
1055   GstBuffer *buffer;
1056   GstRTSPStream *stream;
1057
1058   sample = gst_app_sink_pull_sample (sink);
1059   if (!sample)
1060     return GST_FLOW_OK;
1061
1062   stream = (GstRTSPStream *) user_data;
1063   priv = stream->priv;
1064   buffer = gst_sample_get_buffer (sample);
1065
1066   g_mutex_lock (&priv->lock);
1067   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1068     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1069
1070     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1071       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1072     } else {
1073       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1074     }
1075   }
1076   g_mutex_unlock (&priv->lock);
1077
1078   gst_sample_unref (sample);
1079
1080   return GST_FLOW_OK;
1081 }
1082
1083 static GstAppSinkCallbacks sink_cb = {
1084   NULL,                         /* not interested in EOS */
1085   NULL,                         /* not interested in preroll samples */
1086   handle_new_sample,
1087 };
1088
1089 /**
1090  * gst_rtsp_stream_join_bin:
1091  * @stream: a #GstRTSPStream
1092  * @bin: a #GstBin to join
1093  * @rtpbin: a rtpbin element in @bin
1094  * @state: the target state of the new elements
1095  *
1096  * Join the #Gstbin @bin that contains the element @rtpbin.
1097  *
1098  * @stream will link to @rtpbin, which must be inside @bin. The elements
1099  * added to @bin will be set to the state given in @state.
1100  *
1101  * Returns: %TRUE on success.
1102  */
1103 gboolean
1104 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1105     GstElement * rtpbin, GstState state)
1106 {
1107   GstRTSPStreamPrivate *priv;
1108   gint i, idx;
1109   gchar *name;
1110   GstPad *pad, *teepad, *queuepad, *selpad;
1111   GstPadLinkReturn ret;
1112
1113   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1114   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1115   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1116
1117   priv = stream->priv;
1118
1119   g_mutex_lock (&priv->lock);
1120   if (priv->is_joined)
1121     goto was_joined;
1122
1123   /* create a session with the same index as the stream */
1124   idx = priv->idx;
1125
1126   GST_INFO ("stream %p joining bin as session %d", stream, idx);
1127
1128   if (!alloc_ports (stream))
1129     goto no_ports;
1130
1131   /* update the dscp qos field in the sinks */
1132   update_dscp_qos (stream);
1133
1134   /* get a pad for sending RTP */
1135   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1136   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1137   g_free (name);
1138   /* link the RTP pad to the session manager, it should not really fail unless
1139    * this is not really an RTP pad */
1140   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1141   if (ret != GST_PAD_LINK_OK)
1142     goto link_failed;
1143
1144   /* get pads from the RTP session element for sending and receiving
1145    * RTP/RTCP*/
1146   name = g_strdup_printf ("send_rtp_src_%u", idx);
1147   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1148   g_free (name);
1149   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1150   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1151   g_free (name);
1152   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1153   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1154   g_free (name);
1155   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1156   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1157   g_free (name);
1158
1159   /* get the session */
1160   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1161
1162   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1163       stream);
1164   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1165       stream);
1166   g_signal_connect (priv->session, "on-ssrc-active",
1167       (GCallback) on_ssrc_active, stream);
1168   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1169       stream);
1170   g_signal_connect (priv->session, "on-bye-timeout",
1171       (GCallback) on_bye_timeout, stream);
1172   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1173       stream);
1174
1175   for (i = 0; i < 2; i++) {
1176     /* For the sender we create this bit of pipeline for both
1177      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1178      * we need to add a queue before appsink to make the pipeline
1179      * not block. For the TCP case, we want to pump data to the
1180      * client as fast as possible anyway.
1181      *
1182      * .--------.      .-----.    .---------.
1183      * | rtpbin |      | tee |    | udpsink |
1184      * |       send->sink   src->sink       |
1185      * '--------'      |     |    '---------'
1186      *                 |     |    .---------.    .---------.
1187      *                 |     |    |  queue  |    | appsink |
1188      *                 |    src->sink      src->sink       |
1189      *                 '-----'    '---------'    '---------'
1190      */
1191     /* make tee for RTP/RTCP */
1192     priv->tee[i] = gst_element_factory_make ("tee", NULL);
1193     gst_bin_add (bin, priv->tee[i]);
1194
1195     /* and link to rtpbin send pad */
1196     pad = gst_element_get_static_pad (priv->tee[i], "sink");
1197     gst_pad_link (priv->send_src[i], pad);
1198     gst_object_unref (pad);
1199
1200     /* add udpsink */
1201     gst_bin_add (bin, priv->udpsink[i]);
1202
1203     /* link tee to udpsink */
1204     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1205     pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1206     gst_pad_link (teepad, pad);
1207     gst_object_unref (pad);
1208     gst_object_unref (teepad);
1209
1210     /* make queue */
1211     priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1212     gst_bin_add (bin, priv->appqueue[i]);
1213     /* and link to tee */
1214     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1215     pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1216     gst_pad_link (teepad, pad);
1217     gst_object_unref (pad);
1218     gst_object_unref (teepad);
1219
1220     /* make appsink */
1221     priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1222     g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1223     g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1224     gst_bin_add (bin, priv->appsink[i]);
1225     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1226         &sink_cb, stream, NULL);
1227     /* and link to queue */
1228     queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1229     pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1230     gst_pad_link (queuepad, pad);
1231     gst_object_unref (pad);
1232     gst_object_unref (queuepad);
1233
1234     /* For the receiver we create this bit of pipeline for both
1235      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1236      * and it is all funneled into the rtpbin receive pad.
1237      *
1238      * .--------.     .--------.    .--------.
1239      * | udpsrc |     | funnel |    | rtpbin |
1240      * |       src->sink      src->sink      |
1241      * '--------'     |        |    '--------'
1242      * .--------.     |        |
1243      * | appsrc |     |        |
1244      * |       src->sink       |
1245      * '--------'     '--------'
1246      */
1247     /* make funnel for the RTP/RTCP receivers */
1248     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1249     gst_bin_add (bin, priv->funnel[i]);
1250
1251     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1252     gst_pad_link (pad, priv->recv_sink[i]);
1253     gst_object_unref (pad);
1254
1255     if (priv->udpsrc_v4[i]) {
1256       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1257        * values */
1258       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1259       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1260       /* add udpsrc */
1261       gst_bin_add (bin, priv->udpsrc_v4[i]);
1262
1263       /* and link to the funnel v4 */
1264       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1265       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1266       gst_pad_link (pad, selpad);
1267       gst_object_unref (pad);
1268       gst_object_unref (selpad);
1269     }
1270
1271     if (priv->udpsrc_v6[i]) {
1272       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1273       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1274       gst_bin_add (bin, priv->udpsrc_v6[i]);
1275
1276       /* and link to the funnel v6 */
1277       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1278       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1279       gst_pad_link (pad, selpad);
1280       gst_object_unref (pad);
1281       gst_object_unref (selpad);
1282     }
1283
1284     /* make and add appsrc */
1285     priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1286     gst_bin_add (bin, priv->appsrc[i]);
1287     /* and link to the funnel */
1288     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1289     pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1290     gst_pad_link (pad, selpad);
1291     gst_object_unref (pad);
1292     gst_object_unref (selpad);
1293
1294     /* check if we need to set to a special state */
1295     if (state != GST_STATE_NULL) {
1296       gst_element_set_state (priv->udpsink[i], state);
1297       gst_element_set_state (priv->appsink[i], state);
1298       gst_element_set_state (priv->appqueue[i], state);
1299       gst_element_set_state (priv->tee[i], state);
1300       gst_element_set_state (priv->funnel[i], state);
1301       gst_element_set_state (priv->appsrc[i], state);
1302     }
1303   }
1304
1305   /* be notified of caps changes */
1306   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1307       (GCallback) caps_notify, stream);
1308
1309   priv->is_joined = TRUE;
1310   g_mutex_unlock (&priv->lock);
1311
1312   return TRUE;
1313
1314   /* ERRORS */
1315 was_joined:
1316   {
1317     g_mutex_unlock (&priv->lock);
1318     return TRUE;
1319   }
1320 no_ports:
1321   {
1322     g_mutex_unlock (&priv->lock);
1323     GST_WARNING ("failed to allocate ports %d", idx);
1324     return FALSE;
1325   }
1326 link_failed:
1327   {
1328     GST_WARNING ("failed to link stream %d", idx);
1329     gst_object_unref (priv->send_rtp_sink);
1330     priv->send_rtp_sink = NULL;
1331     g_mutex_unlock (&priv->lock);
1332     return FALSE;
1333   }
1334 }
1335
1336 /**
1337  * gst_rtsp_stream_leave_bin:
1338  * @stream: a #GstRTSPStream
1339  * @bin: a #GstBin
1340  * @rtpbin: a rtpbin #GstElement
1341  *
1342  * Remove the elements of @stream from @bin.
1343  *
1344  * Return: %TRUE on success.
1345  */
1346 gboolean
1347 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1348     GstElement * rtpbin)
1349 {
1350   GstRTSPStreamPrivate *priv;
1351   gint i;
1352
1353   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1354   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1355   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1356
1357   priv = stream->priv;
1358
1359   g_mutex_lock (&priv->lock);
1360   if (!priv->is_joined)
1361     goto was_not_joined;
1362
1363   /* all transports must be removed by now */
1364   g_return_val_if_fail (priv->transports == NULL, FALSE);
1365
1366   GST_INFO ("stream %p leaving bin", stream);
1367
1368   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1369   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1370   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1371   gst_object_unref (priv->send_rtp_sink);
1372   priv->send_rtp_sink = NULL;
1373
1374   for (i = 0; i < 2; i++) {
1375     gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1376     gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1377     gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1378     gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1379     gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1380     gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1381     if (priv->udpsrc_v4[i]) {
1382       /* and set udpsrc to NULL now before removing */
1383       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1384       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1385       /* removing them should also nicely release the request
1386        * pads when they finalize */
1387       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1388     }
1389     if (priv->udpsrc_v6[i]) {
1390       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1391       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1392       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1393     }
1394     gst_bin_remove (bin, priv->udpsink[i]);
1395     gst_bin_remove (bin, priv->appsrc[i]);
1396     gst_bin_remove (bin, priv->appsink[i]);
1397     gst_bin_remove (bin, priv->appqueue[i]);
1398     gst_bin_remove (bin, priv->tee[i]);
1399     gst_bin_remove (bin, priv->funnel[i]);
1400
1401     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1402     gst_object_unref (priv->recv_sink[i]);
1403     priv->recv_sink[i] = NULL;
1404
1405     priv->udpsrc_v4[i] = NULL;
1406     priv->udpsrc_v6[i] = NULL;
1407     priv->udpsink[i] = NULL;
1408     priv->appsrc[i] = NULL;
1409     priv->appsink[i] = NULL;
1410     priv->appqueue[i] = NULL;
1411     priv->tee[i] = NULL;
1412     priv->funnel[i] = NULL;
1413   }
1414   gst_object_unref (priv->send_src[0]);
1415   priv->send_src[0] = NULL;
1416
1417   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1418   gst_object_unref (priv->send_src[1]);
1419   priv->send_src[1] = NULL;
1420
1421   g_object_unref (priv->session);
1422   priv->session = NULL;
1423   if (priv->caps)
1424     gst_caps_unref (priv->caps);
1425   priv->caps = NULL;
1426
1427   priv->is_joined = FALSE;
1428   g_mutex_unlock (&priv->lock);
1429
1430   return TRUE;
1431
1432 was_not_joined:
1433   {
1434     return TRUE;
1435   }
1436 }
1437
1438 /**
1439  * gst_rtsp_stream_get_rtpinfo:
1440  * @stream: a #GstRTSPStream
1441  * @rtptime: result RTP timestamp
1442  * @seq: result RTP seqnum
1443  *
1444  * Retrieve the current rtptime and seq. This is used to
1445  * construct a RTPInfo reply header.
1446  *
1447  * Returns: %TRUE when rtptime and seq could be determined.
1448  */
1449 gboolean
1450 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1451     guint * rtptime, guint * seq)
1452 {
1453   GstRTSPStreamPrivate *priv;
1454   GObjectClass *payobjclass;
1455
1456   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1457   g_return_val_if_fail (rtptime != NULL, FALSE);
1458   g_return_val_if_fail (seq != NULL, FALSE);
1459
1460   priv = stream->priv;
1461
1462   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1463
1464   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1465       !g_object_class_find_property (payobjclass, "timestamp"))
1466     return FALSE;
1467
1468   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1469
1470   return TRUE;
1471 }
1472
1473 /**
1474  * gst_rtsp_stream_get_caps:
1475  * @stream: a #GstRTSPStream
1476  *
1477  * Retrieve the current caps of @stream.
1478  *
1479  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1480  *    after usage.
1481  */
1482 GstCaps *
1483 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1484 {
1485   GstRTSPStreamPrivate *priv;
1486   GstCaps *result;
1487
1488   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1489
1490   priv = stream->priv;
1491
1492   g_mutex_lock (&priv->lock);
1493   if ((result = priv->caps))
1494     gst_caps_ref (result);
1495   g_mutex_unlock (&priv->lock);
1496
1497   return result;
1498 }
1499
1500 /**
1501  * gst_rtsp_stream_recv_rtp:
1502  * @stream: a #GstRTSPStream
1503  * @buffer: (transfer full): a #GstBuffer
1504  *
1505  * Handle an RTP buffer for the stream. This method is usually called when a
1506  * message has been received from a client using the TCP transport.
1507  *
1508  * This function takes ownership of @buffer.
1509  *
1510  * Returns: a GstFlowReturn.
1511  */
1512 GstFlowReturn
1513 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1514 {
1515   GstRTSPStreamPrivate *priv;
1516   GstFlowReturn ret;
1517   GstElement *element;
1518
1519   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1520   priv = stream->priv;
1521   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1522   g_return_val_if_fail (priv->is_joined, FALSE);
1523
1524   g_mutex_lock (&priv->lock);
1525   element = gst_object_ref (priv->appsrc[0]);
1526   g_mutex_unlock (&priv->lock);
1527
1528   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1529
1530   gst_object_unref (element);
1531
1532   return ret;
1533 }
1534
1535 /**
1536  * gst_rtsp_stream_recv_rtcp:
1537  * @stream: a #GstRTSPStream
1538  * @buffer: (transfer full): a #GstBuffer
1539  *
1540  * Handle an RTCP buffer for the stream. This method is usually called when a
1541  * message has been received from a client using the TCP transport.
1542  *
1543  * This function takes ownership of @buffer.
1544  *
1545  * Returns: a GstFlowReturn.
1546  */
1547 GstFlowReturn
1548 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1549 {
1550   GstRTSPStreamPrivate *priv;
1551   GstFlowReturn ret;
1552   GstElement *element;
1553
1554   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1555   priv = stream->priv;
1556   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1557   g_return_val_if_fail (priv->is_joined, FALSE);
1558
1559   g_mutex_lock (&priv->lock);
1560   element = gst_object_ref (priv->appsrc[1]);
1561   g_mutex_unlock (&priv->lock);
1562
1563   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1564
1565   gst_object_unref (element);
1566
1567   return ret;
1568 }
1569
1570 /* must be called with lock */
1571 static gboolean
1572 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1573     gboolean add)
1574 {
1575   GstRTSPStreamPrivate *priv = stream->priv;
1576   const GstRTSPTransport *tr;
1577
1578   tr = gst_rtsp_stream_transport_get_transport (trans);
1579
1580   switch (tr->lower_transport) {
1581     case GST_RTSP_LOWER_TRANS_UDP:
1582     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1583     {
1584       gchar *dest;
1585       gint min, max;
1586       guint ttl = 0;
1587
1588       dest = tr->destination;
1589       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1590         min = tr->port.min;
1591         max = tr->port.max;
1592         ttl = tr->ttl;
1593       } else {
1594         min = tr->client_port.min;
1595         max = tr->client_port.max;
1596       }
1597
1598       if (add) {
1599         GST_INFO ("adding %s:%d-%d", dest, min, max);
1600         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1601         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1602         if (ttl > 0) {
1603           GST_INFO ("setting ttl-mc %d", ttl);
1604           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1605           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1606         }
1607         priv->transports = g_list_prepend (priv->transports, trans);
1608       } else {
1609         GST_INFO ("removing %s:%d-%d", dest, min, max);
1610         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1611         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1612         priv->transports = g_list_remove (priv->transports, trans);
1613       }
1614       break;
1615     }
1616     case GST_RTSP_LOWER_TRANS_TCP:
1617       if (add) {
1618         GST_INFO ("adding TCP %s", tr->destination);
1619         priv->transports = g_list_prepend (priv->transports, trans);
1620       } else {
1621         GST_INFO ("removing TCP %s", tr->destination);
1622         priv->transports = g_list_remove (priv->transports, trans);
1623       }
1624       break;
1625     default:
1626       goto unknown_transport;
1627   }
1628   return TRUE;
1629
1630   /* ERRORS */
1631 unknown_transport:
1632   {
1633     GST_INFO ("Unknown transport %d", tr->lower_transport);
1634     return FALSE;
1635   }
1636 }
1637
1638
1639 /**
1640  * gst_rtsp_stream_add_transport:
1641  * @stream: a #GstRTSPStream
1642  * @trans: a #GstRTSPStreamTransport
1643  *
1644  * Add the transport in @trans to @stream. The media of @stream will
1645  * then also be send to the values configured in @trans.
1646  *
1647  * @stream must be joined to a bin.
1648  *
1649  * @trans must contain a valid #GstRTSPTransport.
1650  *
1651  * Returns: %TRUE if @trans was added
1652  */
1653 gboolean
1654 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1655     GstRTSPStreamTransport * trans)
1656 {
1657   GstRTSPStreamPrivate *priv;
1658   gboolean res;
1659
1660   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1661   priv = stream->priv;
1662   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1663   g_return_val_if_fail (priv->is_joined, FALSE);
1664
1665   g_mutex_lock (&priv->lock);
1666   res = update_transport (stream, trans, TRUE);
1667   g_mutex_unlock (&priv->lock);
1668
1669   return res;
1670 }
1671
1672 /**
1673  * gst_rtsp_stream_remove_transport:
1674  * @stream: a #GstRTSPStream
1675  * @trans: a #GstRTSPStreamTransport
1676  *
1677  * Remove the transport in @trans from @stream. The media of @stream will
1678  * not be sent to the values configured in @trans.
1679  *
1680  * @stream must be joined to a bin.
1681  *
1682  * @trans must contain a valid #GstRTSPTransport.
1683  *
1684  * Returns: %TRUE if @trans was removed
1685  */
1686 gboolean
1687 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1688     GstRTSPStreamTransport * trans)
1689 {
1690   GstRTSPStreamPrivate *priv;
1691   gboolean res;
1692
1693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1694   priv = stream->priv;
1695   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1696   g_return_val_if_fail (priv->is_joined, FALSE);
1697
1698   g_mutex_lock (&priv->lock);
1699   res = update_transport (stream, trans, FALSE);
1700   g_mutex_unlock (&priv->lock);
1701
1702   return res;
1703 }