stream: improve docs
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
31      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
32
33 struct _GstRTSPStreamPrivate
34 {
35   GMutex lock;
36   guint idx;
37   GstPad *srcpad;
38   GstElement *payloader;
39   guint buffer_size;
40   gboolean is_joined;
41
42   /* pads on the rtpbin */
43   GstPad *send_rtp_sink;
44   GstPad *recv_sink[2];
45   GstPad *send_src[2];
46
47   /* the RTPSession object */
48   GObject *session;
49
50   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
51    * sockets */
52   GstElement *udpsrc_v4[2];
53
54   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
55    * sockets */
56   GstElement *udpsrc_v6[2];
57
58   GstElement *udpsink[2];
59
60   /* for TCP transport */
61   GstElement *appsrc[2];
62   GstElement *appqueue[2];
63   GstElement *appsink[2];
64
65   GstElement *tee[2];
66   GstElement *funnel[2];
67
68   /* server ports for sending/receiving over ipv4 */
69   GstRTSPRange server_port_v4;
70   GstRTSPAddress *server_addr_v4;
71
72   /* server ports for sending/receiving over ipv6 */
73   GstRTSPRange server_port_v6;
74   GstRTSPAddress *server_addr_v6;
75
76   /* multicast addresses */
77   GstRTSPAddressPool *pool;
78   GstRTSPAddress *addr;
79
80   /* the caps of the stream */
81   gulong caps_sig;
82   GstCaps *caps;
83
84   /* transports we stream to */
85   guint n_active;
86   GList *transports;
87
88   gint dscp_qos;
89 };
90
91
92 enum
93 {
94   PROP_0,
95   PROP_LAST
96 };
97
98 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
99 #define GST_CAT_DEFAULT rtsp_stream_debug
100
101 static GQuark ssrc_stream_map_key;
102
103 static void gst_rtsp_stream_finalize (GObject * obj);
104
105 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
106
107 static void
108 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
109 {
110   GObjectClass *gobject_class;
111
112   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
113
114   gobject_class = G_OBJECT_CLASS (klass);
115
116   gobject_class->finalize = gst_rtsp_stream_finalize;
117
118   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
119
120   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
121 }
122
123 static void
124 gst_rtsp_stream_init (GstRTSPStream * stream)
125 {
126   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
127
128   GST_DEBUG ("new stream %p", stream);
129
130   stream->priv = priv;
131
132   stream->priv->dscp_qos = -1;
133
134   g_mutex_init (&priv->lock);
135 }
136
137 static void
138 gst_rtsp_stream_finalize (GObject * obj)
139 {
140   GstRTSPStream *stream;
141   GstRTSPStreamPrivate *priv;
142
143   stream = GST_RTSP_STREAM (obj);
144   priv = stream->priv;
145
146   GST_DEBUG ("finalize stream %p", stream);
147
148   /* we really need to be unjoined now */
149   g_return_if_fail (!priv->is_joined);
150
151   if (priv->addr)
152     gst_rtsp_address_free (priv->addr);
153   if (priv->server_addr_v4)
154     gst_rtsp_address_free (priv->server_addr_v4);
155   if (priv->server_addr_v6)
156     gst_rtsp_address_free (priv->server_addr_v6);
157   if (priv->pool)
158     g_object_unref (priv->pool);
159   gst_object_unref (priv->payloader);
160   gst_object_unref (priv->srcpad);
161   g_mutex_clear (&priv->lock);
162
163   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
164 }
165
166 /**
167  * gst_rtsp_stream_new:
168  * @idx: an index
169  * @srcpad: a #GstPad
170  * @payloader: a #GstElement
171  *
172  * Create a new media stream with index @idx that handles RTP data on
173  * @srcpad and has a payloader element @payloader.
174  *
175  * Returns: a new #GstRTSPStream
176  */
177 GstRTSPStream *
178 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
179 {
180   GstRTSPStreamPrivate *priv;
181   GstRTSPStream *stream;
182
183   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
184   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
185   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
186
187   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
188   priv = stream->priv;
189   priv->idx = idx;
190   priv->payloader = gst_object_ref (payloader);
191   priv->srcpad = gst_object_ref (srcpad);
192
193   return stream;
194 }
195
196 /**
197  * gst_rtsp_stream_get_index:
198  * @stream: a #GstRTSPStream
199  *
200  * Get the stream index.
201  *
202  * Return: the stream index.
203  */
204 guint
205 gst_rtsp_stream_get_index (GstRTSPStream * stream)
206 {
207   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
208
209   return stream->priv->idx;
210 }
211
212  /**
213  * gst_rtsp_stream_get_srcpad:
214  * @stream: a #GstRTSPStream
215  *
216  * Get the srcpad associated with @stream.
217  *
218  * Return: the srcpad. Unref after usage.
219  */
220 GstPad *
221 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
222 {
223   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
224
225   return gst_object_ref (stream->priv->srcpad);
226 }
227
228 /**
229  * gst_rtsp_stream_set_mtu:
230  * @stream: a #GstRTSPStream
231  * @mtu: a new MTU
232  *
233  * Configure the mtu in the payloader of @stream to @mtu.
234  */
235 void
236 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
237 {
238   GstRTSPStreamPrivate *priv;
239
240   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
241
242   priv = stream->priv;
243
244   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
245
246   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
247 }
248
249 /**
250  * gst_rtsp_stream_get_mtu:
251  * @stream: a #GstRTSPStream
252  *
253  * Get the configured MTU in the payloader of @stream.
254  *
255  * Returns: the MTU of the payloader.
256  */
257 guint
258 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
259 {
260   GstRTSPStreamPrivate *priv;
261   guint mtu;
262
263   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
264
265   priv = stream->priv;
266
267   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
268
269   return mtu;
270 }
271
272 /* Update the dscp qos property on the udp sinks */
273 static void
274 update_dscp_qos (GstRTSPStream * stream)
275 {
276   GstRTSPStreamPrivate *priv;
277
278   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
279
280   priv = stream->priv;
281
282   if (priv->udpsink[0]) {
283     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
284         NULL);
285   }
286
287   if (priv->udpsink[1]) {
288     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
289         NULL);
290   }
291 }
292
293 /**
294  * gst_rtsp_stream_set_dscp_qos:
295  * @stream: a #GstRTSPStream
296  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
297  *
298  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
299  */
300 void
301 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
302 {
303   GstRTSPStreamPrivate *priv;
304
305   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
306
307   priv = stream->priv;
308
309   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
310
311   if (dscp_qos < -1 || dscp_qos > 63) {
312     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
313     return;
314   }
315
316   priv->dscp_qos = dscp_qos;
317
318   update_dscp_qos (stream);
319 }
320
321 /**
322  * gst_rtsp_stream_get_dscp_qos:
323  * @stream: a #GstRTSPStream
324  *
325  * Get the configured DSCP QoS in of the outgoing sockets.
326  *
327  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
328  */
329 gint
330 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
331 {
332   GstRTSPStreamPrivate *priv;
333
334   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
335
336   priv = stream->priv;
337
338   return priv->dscp_qos;
339 }
340
341
342 /**
343  * gst_rtsp_stream_set_address_pool:
344  * @stream: a #GstRTSPStream
345  * @pool: a #GstRTSPAddressPool
346  *
347  * configure @pool to be used as the address pool of @stream.
348  */
349 void
350 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
351     GstRTSPAddressPool * pool)
352 {
353   GstRTSPStreamPrivate *priv;
354   GstRTSPAddressPool *old;
355
356   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
357
358   priv = stream->priv;
359
360   GST_LOG_OBJECT (stream, "set address pool %p", pool);
361
362   g_mutex_lock (&priv->lock);
363   if ((old = priv->pool) != pool)
364     priv->pool = pool ? g_object_ref (pool) : NULL;
365   else
366     old = NULL;
367   g_mutex_unlock (&priv->lock);
368
369   if (old)
370     g_object_unref (old);
371 }
372
373 /**
374  * gst_rtsp_stream_get_address_pool:
375  * @stream: a #GstRTSPStream
376  *
377  * Get the #GstRTSPAddressPool used as the address pool of @stream.
378  *
379  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
380  * usage.
381  */
382 GstRTSPAddressPool *
383 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
384 {
385   GstRTSPStreamPrivate *priv;
386   GstRTSPAddressPool *result;
387
388   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
389
390   priv = stream->priv;
391
392   g_mutex_lock (&priv->lock);
393   if ((result = priv->pool))
394     g_object_ref (result);
395   g_mutex_unlock (&priv->lock);
396
397   return result;
398 }
399
400 /**
401  * gst_rtsp_stream_get_address:
402  * @stream: a #GstRTSPStream
403  *
404  * Get the multicast address of @stream.
405  *
406  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
407  * allocated. gst_rtsp_address_free() after usage.
408  */
409 GstRTSPAddress *
410 gst_rtsp_stream_get_address (GstRTSPStream * stream)
411 {
412   GstRTSPStreamPrivate *priv;
413   GstRTSPAddress *result;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
416
417   priv = stream->priv;
418
419   g_mutex_lock (&priv->lock);
420   if (priv->addr == NULL) {
421     if (priv->pool == NULL)
422       goto no_pool;
423
424     priv->addr = gst_rtsp_address_pool_acquire_address (priv->pool,
425         GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST, 2);
426     if (priv->addr == NULL)
427       goto no_address;
428   }
429   result = gst_rtsp_address_copy (priv->addr);
430   g_mutex_unlock (&priv->lock);
431
432   return result;
433
434   /* ERRORS */
435 no_pool:
436   {
437     GST_ERROR_OBJECT (stream, "no address pool specified");
438     g_mutex_unlock (&priv->lock);
439     return NULL;
440   }
441 no_address:
442   {
443     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
444     g_mutex_unlock (&priv->lock);
445     return NULL;
446   }
447 }
448
449 /**
450  * gst_rtsp_stream_reserve_address:
451  * @stream: a #GstRTSPStream
452  * @address: an address
453  * @port: a port
454  * @n_ports: n_ports
455  * @ttl: a TTL
456  *
457  * Reserve @address and @port as the address and port of @stream.
458  *
459  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
460  * reserved. gst_rtsp_address_free() after usage.
461  */
462 GstRTSPAddress *
463 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
464     const gchar * address, guint port, guint n_ports, guint ttl)
465 {
466   GstRTSPStreamPrivate *priv;
467   GstRTSPAddress *result;
468
469   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
470   g_return_val_if_fail (address != NULL, NULL);
471   g_return_val_if_fail (port > 0, NULL);
472   g_return_val_if_fail (n_ports > 0, NULL);
473   g_return_val_if_fail (ttl > 0, NULL);
474
475   priv = stream->priv;
476
477   g_mutex_lock (&priv->lock);
478   if (priv->addr == NULL) {
479     if (priv->pool == NULL)
480       goto no_pool;
481
482     priv->addr = gst_rtsp_address_pool_reserve_address (priv->pool, address,
483         port, n_ports, ttl);
484     if (priv->addr == NULL)
485       goto no_address;
486   } else {
487     if (strcmp (priv->addr->address, address) ||
488         priv->addr->port != port || priv->addr->n_ports != n_ports ||
489         priv->addr->ttl != ttl)
490       goto different_address;
491   }
492   result = gst_rtsp_address_copy (priv->addr);
493   g_mutex_unlock (&priv->lock);
494
495   return result;
496
497   /* ERRORS */
498 no_pool:
499   {
500     GST_ERROR_OBJECT (stream, "no address pool specified");
501     g_mutex_unlock (&priv->lock);
502     return NULL;
503   }
504 no_address:
505   {
506     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
507         address);
508     g_mutex_unlock (&priv->lock);
509     return NULL;
510   }
511 different_address:
512   {
513     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
514         " reserved", address);
515     g_mutex_unlock (&priv->lock);
516     return NULL;
517   }
518 }
519
520 static gboolean
521 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
522     GSocketFamily family, GstElement * udpsrc_out[2],
523     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
524     GstRTSPAddress ** server_addr_out)
525 {
526   GstStateChangeReturn ret;
527   GstElement *udpsrc0, *udpsrc1;
528   GstElement *udpsink0, *udpsink1;
529   GSocket *rtp_socket = NULL;
530   GSocket *rtcp_socket;
531   gint tmp_rtp, tmp_rtcp;
532   guint count;
533   gint rtpport, rtcpport;
534   GList *rejected_addresses = NULL;
535   GstRTSPAddress *addr = NULL;
536   GInetAddress *inetaddr = NULL;
537   GSocketAddress *rtp_sockaddr = NULL;
538   GSocketAddress *rtcp_sockaddr = NULL;
539   const gchar *multisink_socket = "socket";
540
541   if (family == G_SOCKET_FAMILY_IPV6) {
542     multisink_socket = "socket-v6";
543   }
544
545   udpsrc0 = NULL;
546   udpsrc1 = NULL;
547   udpsink0 = NULL;
548   udpsink1 = NULL;
549   count = 0;
550
551   /* Start with random port */
552   tmp_rtp = 0;
553
554   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
555       G_SOCKET_PROTOCOL_UDP, NULL);
556   if (!rtcp_socket)
557     goto no_udp_protocol;
558
559   if (*server_addr_out)
560     gst_rtsp_address_free (*server_addr_out);
561
562   /* try to allocate 2 UDP ports, the RTP port should be an even
563    * number and the RTCP port should be the next (uneven) port */
564 again:
565
566   if (rtp_socket == NULL) {
567     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
568         G_SOCKET_PROTOCOL_UDP, NULL);
569     if (!rtp_socket)
570       goto no_udp_protocol;
571   }
572
573   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
574     GstRTSPAddressFlags flags;
575
576     if (addr)
577       rejected_addresses = g_list_prepend (rejected_addresses, addr);
578
579     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
580     if (family == G_SOCKET_FAMILY_IPV6)
581       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
582     else
583       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
584
585     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
586
587     if (addr == NULL)
588       goto no_ports;
589
590     tmp_rtp = addr->port;
591
592     g_clear_object (&inetaddr);
593     inetaddr = g_inet_address_new_from_string (addr->address);
594   } else {
595     if (tmp_rtp != 0) {
596       tmp_rtp += 2;
597       if (++count > 20)
598         goto no_ports;
599     }
600
601     if (inetaddr == NULL)
602       inetaddr = g_inet_address_new_any (family);
603   }
604
605   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
606   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
607     g_object_unref (rtp_sockaddr);
608     goto again;
609   }
610   g_object_unref (rtp_sockaddr);
611
612   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
613   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
614     g_clear_object (&rtp_sockaddr);
615     goto socket_error;
616   }
617
618   tmp_rtp =
619       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
620   g_object_unref (rtp_sockaddr);
621
622   /* check if port is even */
623   if ((tmp_rtp & 1) != 0) {
624     /* port not even, close and allocate another */
625     tmp_rtp++;
626     g_clear_object (&rtp_socket);
627     goto again;
628   }
629
630   /* set port */
631   tmp_rtcp = tmp_rtp + 1;
632
633   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
634   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
635     g_object_unref (rtcp_sockaddr);
636     g_clear_object (&rtp_socket);
637     goto again;
638   }
639   g_object_unref (rtcp_sockaddr);
640
641   g_clear_object (&inetaddr);
642
643   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
644   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
645
646   if (udpsrc0 == NULL || udpsrc1 == NULL)
647     goto no_udp_protocol;
648
649   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
650   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
651
652   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
653   if (ret == GST_STATE_CHANGE_FAILURE)
654     goto element_error;
655   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
656   if (ret == GST_STATE_CHANGE_FAILURE)
657     goto element_error;
658
659   /* all fine, do port check */
660   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
661   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
662
663   /* this should not happen... */
664   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
665     goto port_error;
666
667   if (udpsink_out[0])
668     udpsink0 = udpsink_out[0];
669   else
670     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
671
672   if (!udpsink0)
673     goto no_udp_protocol;
674
675   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
676   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
677
678   if (udpsink_out[1])
679     udpsink1 = udpsink_out[1];
680   else
681     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
682
683   if (!udpsink1)
684     goto no_udp_protocol;
685
686   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
687   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
688   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
689
690   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
691   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
692   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
693   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
694   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
695   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
696   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
697   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
698
699   /* we keep these elements, we will further configure them when the
700    * client told us to really use the UDP ports. */
701   udpsrc_out[0] = udpsrc0;
702   udpsrc_out[1] = udpsrc1;
703   udpsink_out[0] = udpsink0;
704   udpsink_out[1] = udpsink1;
705   server_port_out->min = rtpport;
706   server_port_out->max = rtcpport;
707
708   *server_addr_out = addr;
709   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
710
711   g_object_unref (rtp_socket);
712   g_object_unref (rtcp_socket);
713
714   return TRUE;
715
716   /* ERRORS */
717 no_udp_protocol:
718   {
719     goto cleanup;
720   }
721 no_ports:
722   {
723     goto cleanup;
724   }
725 port_error:
726   {
727     goto cleanup;
728   }
729 socket_error:
730   {
731     goto cleanup;
732   }
733 element_error:
734   {
735     goto cleanup;
736   }
737 cleanup:
738   {
739     if (udpsrc0) {
740       gst_element_set_state (udpsrc0, GST_STATE_NULL);
741       gst_object_unref (udpsrc0);
742     }
743     if (udpsrc1) {
744       gst_element_set_state (udpsrc1, GST_STATE_NULL);
745       gst_object_unref (udpsrc1);
746     }
747     if (udpsink0) {
748       gst_element_set_state (udpsink0, GST_STATE_NULL);
749       gst_object_unref (udpsink0);
750     }
751     if (udpsink1) {
752       gst_element_set_state (udpsink1, GST_STATE_NULL);
753       gst_object_unref (udpsink1);
754     }
755     if (inetaddr)
756       g_object_unref (inetaddr);
757     g_list_free_full (rejected_addresses,
758         (GDestroyNotify) gst_rtsp_address_free);
759     if (addr)
760       gst_rtsp_address_free (addr);
761     if (rtp_socket)
762       g_object_unref (rtp_socket);
763     if (rtcp_socket)
764       g_object_unref (rtcp_socket);
765     return FALSE;
766   }
767 }
768
769 /* must be called with lock */
770 static gboolean
771 alloc_ports (GstRTSPStream * stream)
772 {
773   GstRTSPStreamPrivate *priv = stream->priv;
774
775   return alloc_ports_one_family (priv->pool, priv->buffer_size,
776       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
777       &priv->server_port_v4, &priv->server_addr_v4) &&
778       alloc_ports_one_family (priv->pool, priv->buffer_size,
779       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
780       &priv->server_port_v6, &priv->server_addr_v6);
781 }
782
783 /**
784  * gst_rtsp_stream_get_server_port:
785  * @stream: a #GstRTSPStream
786  * @server_port: (out): result server port
787  *
788  * Fill @server_port with the port pair used by the server. This function can
789  * only be called when @stream has been joined.
790  */
791 void
792 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
793     GstRTSPRange * server_port, GSocketFamily family)
794 {
795   GstRTSPStreamPrivate *priv;
796
797   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
798   priv = stream->priv;
799   g_return_if_fail (priv->is_joined);
800
801   g_mutex_lock (&priv->lock);
802   if (family == G_SOCKET_FAMILY_IPV4) {
803     if (server_port)
804       *server_port = priv->server_port_v4;
805   } else {
806     if (server_port)
807       *server_port = priv->server_port_v6;
808   }
809   g_mutex_unlock (&priv->lock);
810 }
811
812 /**
813  * gst_rtsp_stream_get_rtpsession:
814  * @stream: a #GstRTSPStream
815  *
816  * Get the RTP session of this stream.
817  *
818  * Returns: The RTP session of this stream. Unref after usage.
819  */
820 GObject *
821 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
822 {
823   GstRTSPStreamPrivate *priv;
824   GObject *session;
825
826   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
827
828   priv = stream->priv;
829
830   g_mutex_lock (&priv->lock);
831   if ((session = priv->session))
832     g_object_ref (session);
833   g_mutex_unlock (&priv->lock);
834
835   return session;
836 }
837
838 /**
839  * gst_rtsp_stream_get_ssrc:
840  * @stream: a #GstRTSPStream
841  * @ssrc: (out): result ssrc
842  *
843  * Get the SSRC used by the RTP session of this stream. This function can only
844  * be called when @stream has been joined.
845  */
846 void
847 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
848 {
849   GstRTSPStreamPrivate *priv;
850
851   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
852   priv = stream->priv;
853   g_return_if_fail (priv->is_joined);
854
855   g_mutex_lock (&priv->lock);
856   if (ssrc && priv->session)
857     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
858   g_mutex_unlock (&priv->lock);
859 }
860
861 /* executed from streaming thread */
862 static void
863 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
864 {
865   GstRTSPStreamPrivate *priv = stream->priv;
866   GstCaps *newcaps, *oldcaps;
867
868   newcaps = gst_pad_get_current_caps (pad);
869
870   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
871       newcaps);
872
873   g_mutex_lock (&priv->lock);
874   oldcaps = priv->caps;
875   priv->caps = newcaps;
876   g_mutex_unlock (&priv->lock);
877
878   if (oldcaps)
879     gst_caps_unref (oldcaps);
880 }
881
882 static void
883 dump_structure (const GstStructure * s)
884 {
885   gchar *sstr;
886
887   sstr = gst_structure_to_string (s);
888   GST_INFO ("structure: %s", sstr);
889   g_free (sstr);
890 }
891
892 static GstRTSPStreamTransport *
893 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
894 {
895   GstRTSPStreamPrivate *priv = stream->priv;
896   GList *walk;
897   GstRTSPStreamTransport *result = NULL;
898   const gchar *tmp;
899   gchar *dest;
900   guint port;
901
902   if (rtcp_from == NULL)
903     return NULL;
904
905   tmp = g_strrstr (rtcp_from, ":");
906   if (tmp == NULL)
907     return NULL;
908
909   port = atoi (tmp + 1);
910   dest = g_strndup (rtcp_from, tmp - rtcp_from);
911
912   g_mutex_lock (&priv->lock);
913   GST_INFO ("finding %s:%d in %d transports", dest, port,
914       g_list_length (priv->transports));
915
916   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
917     GstRTSPStreamTransport *trans = walk->data;
918     const GstRTSPTransport *tr;
919     gint min, max;
920
921     tr = gst_rtsp_stream_transport_get_transport (trans);
922
923     min = tr->client_port.min;
924     max = tr->client_port.max;
925
926     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
927       result = trans;
928       break;
929     }
930   }
931   if (result)
932     g_object_ref (result);
933   g_mutex_unlock (&priv->lock);
934
935   g_free (dest);
936
937   return result;
938 }
939
940 static GstRTSPStreamTransport *
941 check_transport (GObject * source, GstRTSPStream * stream)
942 {
943   GstStructure *stats;
944   GstRTSPStreamTransport *trans;
945
946   /* see if we have a stream to match with the origin of the RTCP packet */
947   trans = g_object_get_qdata (source, ssrc_stream_map_key);
948   if (trans == NULL) {
949     g_object_get (source, "stats", &stats, NULL);
950     if (stats) {
951       const gchar *rtcp_from;
952
953       dump_structure (stats);
954
955       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
956       if ((trans = find_transport (stream, rtcp_from))) {
957         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
958             source);
959         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
960             g_object_unref);
961       }
962       gst_structure_free (stats);
963     }
964   }
965   return trans;
966 }
967
968
969 static void
970 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
971 {
972   GstRTSPStreamTransport *trans;
973
974   GST_INFO ("%p: new source %p", stream, source);
975
976   trans = check_transport (source, stream);
977
978   if (trans)
979     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
980 }
981
982 static void
983 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
984 {
985   GST_INFO ("%p: new SDES %p", stream, source);
986 }
987
988 static void
989 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
990 {
991   GstRTSPStreamTransport *trans;
992
993   trans = check_transport (source, stream);
994
995   if (trans) {
996     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
997     gst_rtsp_stream_transport_keep_alive (trans);
998   }
999 #ifdef DUMP_STATS
1000   {
1001     GstStructure *stats;
1002     g_object_get (source, "stats", &stats, NULL);
1003     if (stats) {
1004       dump_structure (stats);
1005       gst_structure_free (stats);
1006     }
1007   }
1008 #endif
1009 }
1010
1011 static void
1012 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1013 {
1014   GST_INFO ("%p: source %p bye", stream, source);
1015 }
1016
1017 static void
1018 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1019 {
1020   GstRTSPStreamTransport *trans;
1021
1022   GST_INFO ("%p: source %p bye timeout", stream, source);
1023
1024   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1025     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1026     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1027   }
1028 }
1029
1030 static void
1031 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1032 {
1033   GstRTSPStreamTransport *trans;
1034
1035   GST_INFO ("%p: source %p timeout", stream, source);
1036
1037   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1038     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1039     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1040   }
1041 }
1042
1043 static GstFlowReturn
1044 handle_new_sample (GstAppSink * sink, gpointer user_data)
1045 {
1046   GstRTSPStreamPrivate *priv;
1047   GList *walk;
1048   GstSample *sample;
1049   GstBuffer *buffer;
1050   GstRTSPStream *stream;
1051
1052   sample = gst_app_sink_pull_sample (sink);
1053   if (!sample)
1054     return GST_FLOW_OK;
1055
1056   stream = (GstRTSPStream *) user_data;
1057   priv = stream->priv;
1058   buffer = gst_sample_get_buffer (sample);
1059
1060   g_mutex_lock (&priv->lock);
1061   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1062     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1063
1064     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1065       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1066     } else {
1067       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1068     }
1069   }
1070   g_mutex_unlock (&priv->lock);
1071
1072   gst_sample_unref (sample);
1073
1074   return GST_FLOW_OK;
1075 }
1076
1077 static GstAppSinkCallbacks sink_cb = {
1078   NULL,                         /* not interested in EOS */
1079   NULL,                         /* not interested in preroll samples */
1080   handle_new_sample,
1081 };
1082
1083 /**
1084  * gst_rtsp_stream_join_bin:
1085  * @stream: a #GstRTSPStream
1086  * @bin: a #GstBin to join
1087  * @rtpbin: a rtpbin element in @bin
1088  * @state: the target state of the new elements
1089  *
1090  * Join the #Gstbin @bin that contains the element @rtpbin.
1091  *
1092  * @stream will link to @rtpbin, which must be inside @bin. The elements
1093  * added to @bin will be set to the state given in @state.
1094  *
1095  * Returns: %TRUE on success.
1096  */
1097 gboolean
1098 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1099     GstElement * rtpbin, GstState state)
1100 {
1101   GstRTSPStreamPrivate *priv;
1102   gint i, idx;
1103   gchar *name;
1104   GstPad *pad, *teepad, *queuepad, *selpad;
1105   GstPadLinkReturn ret;
1106
1107   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1108   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1109   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1110
1111   priv = stream->priv;
1112
1113   g_mutex_lock (&priv->lock);
1114   if (priv->is_joined)
1115     goto was_joined;
1116
1117   /* create a session with the same index as the stream */
1118   idx = priv->idx;
1119
1120   GST_INFO ("stream %p joining bin as session %d", stream, idx);
1121
1122   if (!alloc_ports (stream))
1123     goto no_ports;
1124
1125   /* update the dscp qos field in the sinks */
1126   update_dscp_qos (stream);
1127
1128   /* get a pad for sending RTP */
1129   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1130   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1131   g_free (name);
1132   /* link the RTP pad to the session manager, it should not really fail unless
1133    * this is not really an RTP pad */
1134   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1135   if (ret != GST_PAD_LINK_OK)
1136     goto link_failed;
1137
1138   /* get pads from the RTP session element for sending and receiving
1139    * RTP/RTCP*/
1140   name = g_strdup_printf ("send_rtp_src_%u", idx);
1141   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1142   g_free (name);
1143   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1144   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1145   g_free (name);
1146   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1147   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1148   g_free (name);
1149   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1150   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1151   g_free (name);
1152
1153   /* get the session */
1154   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1155
1156   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1157       stream);
1158   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1159       stream);
1160   g_signal_connect (priv->session, "on-ssrc-active",
1161       (GCallback) on_ssrc_active, stream);
1162   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1163       stream);
1164   g_signal_connect (priv->session, "on-bye-timeout",
1165       (GCallback) on_bye_timeout, stream);
1166   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1167       stream);
1168
1169   for (i = 0; i < 2; i++) {
1170     /* For the sender we create this bit of pipeline for both
1171      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1172      * we need to add a queue before appsink to make the pipeline
1173      * not block. For the TCP case, we want to pump data to the
1174      * client as fast as possible anyway.
1175      *
1176      * .--------.      .-----.    .---------.
1177      * | rtpbin |      | tee |    | udpsink |
1178      * |       send->sink   src->sink       |
1179      * '--------'      |     |    '---------'
1180      *                 |     |    .---------.    .---------.
1181      *                 |     |    |  queue  |    | appsink |
1182      *                 |    src->sink      src->sink       |
1183      *                 '-----'    '---------'    '---------'
1184      */
1185     /* make tee for RTP/RTCP */
1186     priv->tee[i] = gst_element_factory_make ("tee", NULL);
1187     gst_bin_add (bin, priv->tee[i]);
1188
1189     /* and link to rtpbin send pad */
1190     pad = gst_element_get_static_pad (priv->tee[i], "sink");
1191     gst_pad_link (priv->send_src[i], pad);
1192     gst_object_unref (pad);
1193
1194     /* add udpsink */
1195     gst_bin_add (bin, priv->udpsink[i]);
1196
1197     /* link tee to udpsink */
1198     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1199     pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1200     gst_pad_link (teepad, pad);
1201     gst_object_unref (pad);
1202     gst_object_unref (teepad);
1203
1204     /* make queue */
1205     priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1206     gst_bin_add (bin, priv->appqueue[i]);
1207     /* and link to tee */
1208     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1209     pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1210     gst_pad_link (teepad, pad);
1211     gst_object_unref (pad);
1212     gst_object_unref (teepad);
1213
1214     /* make appsink */
1215     priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1216     g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1217     g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1218     gst_bin_add (bin, priv->appsink[i]);
1219     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1220         &sink_cb, stream, NULL);
1221     /* and link to queue */
1222     queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1223     pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1224     gst_pad_link (queuepad, pad);
1225     gst_object_unref (pad);
1226     gst_object_unref (queuepad);
1227
1228     /* For the receiver we create this bit of pipeline for both
1229      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1230      * and it is all funneled into the rtpbin receive pad.
1231      *
1232      * .--------.     .--------.    .--------.
1233      * | udpsrc |     | funnel |    | rtpbin |
1234      * |       src->sink      src->sink      |
1235      * '--------'     |        |    '--------'
1236      * .--------.     |        |
1237      * | appsrc |     |        |
1238      * |       src->sink       |
1239      * '--------'     '--------'
1240      */
1241     /* make funnel for the RTP/RTCP receivers */
1242     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1243     gst_bin_add (bin, priv->funnel[i]);
1244
1245     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1246     gst_pad_link (pad, priv->recv_sink[i]);
1247     gst_object_unref (pad);
1248
1249     /* we set and keep these to playing so that they don't cause NO_PREROLL return
1250      * values */
1251     gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1252     gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1253     gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1254     gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1255     /* add udpsrc */
1256     gst_bin_add (bin, priv->udpsrc_v4[i]);
1257     gst_bin_add (bin, priv->udpsrc_v6[i]);
1258     /* and link to the funnel v4 */
1259     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1260     pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1261     gst_pad_link (pad, selpad);
1262     gst_object_unref (pad);
1263     gst_object_unref (selpad);
1264
1265     /* and link to the funnel v6 */
1266     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1267     pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1268     gst_pad_link (pad, selpad);
1269     gst_object_unref (pad);
1270     gst_object_unref (selpad);
1271
1272     /* make and add appsrc */
1273     priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1274     gst_bin_add (bin, priv->appsrc[i]);
1275     /* and link to the funnel */
1276     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1277     pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1278     gst_pad_link (pad, selpad);
1279     gst_object_unref (pad);
1280     gst_object_unref (selpad);
1281
1282     /* check if we need to set to a special state */
1283     if (state != GST_STATE_NULL) {
1284       gst_element_set_state (priv->udpsink[i], state);
1285       gst_element_set_state (priv->appsink[i], state);
1286       gst_element_set_state (priv->appqueue[i], state);
1287       gst_element_set_state (priv->tee[i], state);
1288       gst_element_set_state (priv->funnel[i], state);
1289       gst_element_set_state (priv->appsrc[i], state);
1290     }
1291   }
1292
1293   /* be notified of caps changes */
1294   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1295       (GCallback) caps_notify, stream);
1296
1297   priv->is_joined = TRUE;
1298   g_mutex_unlock (&priv->lock);
1299
1300   return TRUE;
1301
1302   /* ERRORS */
1303 was_joined:
1304   {
1305     g_mutex_unlock (&priv->lock);
1306     return TRUE;
1307   }
1308 no_ports:
1309   {
1310     g_mutex_unlock (&priv->lock);
1311     GST_WARNING ("failed to allocate ports %d", idx);
1312     return FALSE;
1313   }
1314 link_failed:
1315   {
1316     GST_WARNING ("failed to link stream %d", idx);
1317     gst_object_unref (priv->send_rtp_sink);
1318     priv->send_rtp_sink = NULL;
1319     g_mutex_unlock (&priv->lock);
1320     return FALSE;
1321   }
1322 }
1323
1324 /**
1325  * gst_rtsp_stream_leave_bin:
1326  * @stream: a #GstRTSPStream
1327  * @bin: a #GstBin
1328  * @rtpbin: a rtpbin #GstElement
1329  *
1330  * Remove the elements of @stream from @bin.
1331  *
1332  * Return: %TRUE on success.
1333  */
1334 gboolean
1335 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1336     GstElement * rtpbin)
1337 {
1338   GstRTSPStreamPrivate *priv;
1339   gint i;
1340
1341   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1342   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1343   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1344
1345   priv = stream->priv;
1346
1347   g_mutex_lock (&priv->lock);
1348   if (!priv->is_joined)
1349     goto was_not_joined;
1350
1351   /* all transports must be removed by now */
1352   g_return_val_if_fail (priv->transports == NULL, FALSE);
1353
1354   GST_INFO ("stream %p leaving bin", stream);
1355
1356   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1357   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1358   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1359   gst_object_unref (priv->send_rtp_sink);
1360   priv->send_rtp_sink = NULL;
1361
1362   for (i = 0; i < 2; i++) {
1363     gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1364     gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1365     gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1366     gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1367     gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1368     gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1369     /* and set udpsrc to NULL now before removing */
1370     gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1371     gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1372     gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1373     gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1374
1375     /* removing them should also nicely release the request
1376      * pads when they finalize */
1377     gst_bin_remove (bin, priv->udpsrc_v4[i]);
1378     gst_bin_remove (bin, priv->udpsrc_v6[i]);
1379     gst_bin_remove (bin, priv->udpsink[i]);
1380     gst_bin_remove (bin, priv->appsrc[i]);
1381     gst_bin_remove (bin, priv->appsink[i]);
1382     gst_bin_remove (bin, priv->appqueue[i]);
1383     gst_bin_remove (bin, priv->tee[i]);
1384     gst_bin_remove (bin, priv->funnel[i]);
1385
1386     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1387     gst_object_unref (priv->recv_sink[i]);
1388     priv->recv_sink[i] = NULL;
1389
1390     priv->udpsrc_v4[i] = NULL;
1391     priv->udpsrc_v6[i] = NULL;
1392     priv->udpsink[i] = NULL;
1393     priv->appsrc[i] = NULL;
1394     priv->appsink[i] = NULL;
1395     priv->appqueue[i] = NULL;
1396     priv->tee[i] = NULL;
1397     priv->funnel[i] = NULL;
1398   }
1399   gst_object_unref (priv->send_src[0]);
1400   priv->send_src[0] = NULL;
1401
1402   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1403   gst_object_unref (priv->send_src[1]);
1404   priv->send_src[1] = NULL;
1405
1406   g_object_unref (priv->session);
1407   priv->session = NULL;
1408   if (priv->caps)
1409     gst_caps_unref (priv->caps);
1410   priv->caps = NULL;
1411
1412   priv->is_joined = FALSE;
1413   g_mutex_unlock (&priv->lock);
1414
1415   return TRUE;
1416
1417 was_not_joined:
1418   {
1419     return TRUE;
1420   }
1421 }
1422
1423 /**
1424  * gst_rtsp_stream_get_rtpinfo:
1425  * @stream: a #GstRTSPStream
1426  * @rtptime: result RTP timestamp
1427  * @seq: result RTP seqnum
1428  *
1429  * Retrieve the current rtptime and seq. This is used to
1430  * construct a RTPInfo reply header.
1431  *
1432  * Returns: %TRUE when rtptime and seq could be determined.
1433  */
1434 gboolean
1435 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1436     guint * rtptime, guint * seq)
1437 {
1438   GstRTSPStreamPrivate *priv;
1439   GObjectClass *payobjclass;
1440
1441   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1442   g_return_val_if_fail (rtptime != NULL, FALSE);
1443   g_return_val_if_fail (seq != NULL, FALSE);
1444
1445   priv = stream->priv;
1446
1447   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1448
1449   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1450       !g_object_class_find_property (payobjclass, "timestamp"))
1451     return FALSE;
1452
1453   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1454
1455   return TRUE;
1456 }
1457
1458 /**
1459  * gst_rtsp_stream_get_caps:
1460  * @stream: a #GstRTSPStream
1461  *
1462  * Retrieve the current caps of @stream.
1463  *
1464  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1465  *    after usage.
1466  */
1467 GstCaps *
1468 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1469 {
1470   GstRTSPStreamPrivate *priv;
1471   GstCaps *result;
1472
1473   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1474
1475   priv = stream->priv;
1476
1477   g_mutex_lock (&priv->lock);
1478   if ((result = priv->caps))
1479     gst_caps_ref (result);
1480   g_mutex_unlock (&priv->lock);
1481
1482   return result;
1483 }
1484
1485 /**
1486  * gst_rtsp_stream_recv_rtp:
1487  * @stream: a #GstRTSPStream
1488  * @buffer: (transfer full): a #GstBuffer
1489  *
1490  * Handle an RTP buffer for the stream. This method is usually called when a
1491  * message has been received from a client using the TCP transport.
1492  *
1493  * This function takes ownership of @buffer.
1494  *
1495  * Returns: a GstFlowReturn.
1496  */
1497 GstFlowReturn
1498 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1499 {
1500   GstRTSPStreamPrivate *priv;
1501   GstFlowReturn ret;
1502   GstElement *element;
1503
1504   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1505   priv = stream->priv;
1506   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1507   g_return_val_if_fail (priv->is_joined, FALSE);
1508
1509   g_mutex_lock (&priv->lock);
1510   element = gst_object_ref (priv->appsrc[0]);
1511   g_mutex_unlock (&priv->lock);
1512
1513   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1514
1515   gst_object_unref (element);
1516
1517   return ret;
1518 }
1519
1520 /**
1521  * gst_rtsp_stream_recv_rtcp:
1522  * @stream: a #GstRTSPStream
1523  * @buffer: (transfer full): a #GstBuffer
1524  *
1525  * Handle an RTCP buffer for the stream. This method is usually called when a
1526  * message has been received from a client using the TCP transport.
1527  *
1528  * This function takes ownership of @buffer.
1529  *
1530  * Returns: a GstFlowReturn.
1531  */
1532 GstFlowReturn
1533 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1534 {
1535   GstRTSPStreamPrivate *priv;
1536   GstFlowReturn ret;
1537   GstElement *element;
1538
1539   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1540   priv = stream->priv;
1541   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1542   g_return_val_if_fail (priv->is_joined, FALSE);
1543
1544   g_mutex_lock (&priv->lock);
1545   element = gst_object_ref (priv->appsrc[1]);
1546   g_mutex_unlock (&priv->lock);
1547
1548   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1549
1550   gst_object_unref (element);
1551
1552   return ret;
1553 }
1554
1555 /* must be called with lock */
1556 static gboolean
1557 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1558     gboolean add)
1559 {
1560   GstRTSPStreamPrivate *priv = stream->priv;
1561   const GstRTSPTransport *tr;
1562
1563   tr = gst_rtsp_stream_transport_get_transport (trans);
1564
1565   switch (tr->lower_transport) {
1566     case GST_RTSP_LOWER_TRANS_UDP:
1567     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1568     {
1569       gchar *dest;
1570       gint min, max;
1571       guint ttl = 0;
1572
1573       dest = tr->destination;
1574       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1575         min = tr->port.min;
1576         max = tr->port.max;
1577         ttl = tr->ttl;
1578       } else {
1579         min = tr->client_port.min;
1580         max = tr->client_port.max;
1581       }
1582
1583       if (add) {
1584         GST_INFO ("adding %s:%d-%d", dest, min, max);
1585         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1586         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1587         if (ttl > 0) {
1588           GST_INFO ("setting ttl-mc %d", ttl);
1589           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1590           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1591         }
1592         priv->transports = g_list_prepend (priv->transports, trans);
1593       } else {
1594         GST_INFO ("removing %s:%d-%d", dest, min, max);
1595         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1596         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1597         priv->transports = g_list_remove (priv->transports, trans);
1598       }
1599       break;
1600     }
1601     case GST_RTSP_LOWER_TRANS_TCP:
1602       if (add) {
1603         GST_INFO ("adding TCP %s", tr->destination);
1604         priv->transports = g_list_prepend (priv->transports, trans);
1605       } else {
1606         GST_INFO ("removing TCP %s", tr->destination);
1607         priv->transports = g_list_remove (priv->transports, trans);
1608       }
1609       break;
1610     default:
1611       goto unknown_transport;
1612   }
1613   return TRUE;
1614
1615   /* ERRORS */
1616 unknown_transport:
1617   {
1618     GST_INFO ("Unknown transport %d", tr->lower_transport);
1619     return FALSE;
1620   }
1621 }
1622
1623
1624 /**
1625  * gst_rtsp_stream_add_transport:
1626  * @stream: a #GstRTSPStream
1627  * @trans: a #GstRTSPStreamTransport
1628  *
1629  * Add the transport in @trans to @stream. The media of @stream will
1630  * then also be send to the values configured in @trans.
1631  *
1632  * @stream must be joined to a bin.
1633  *
1634  * @trans must contain a valid #GstRTSPTransport.
1635  *
1636  * Returns: %TRUE if @trans was added
1637  */
1638 gboolean
1639 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1640     GstRTSPStreamTransport * trans)
1641 {
1642   GstRTSPStreamPrivate *priv;
1643   gboolean res;
1644
1645   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1646   priv = stream->priv;
1647   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1648   g_return_val_if_fail (priv->is_joined, FALSE);
1649
1650   g_mutex_lock (&priv->lock);
1651   res = update_transport (stream, trans, TRUE);
1652   g_mutex_unlock (&priv->lock);
1653
1654   return res;
1655 }
1656
1657 /**
1658  * gst_rtsp_stream_remove_transport:
1659  * @stream: a #GstRTSPStream
1660  * @trans: a #GstRTSPStreamTransport
1661  *
1662  * Remove the transport in @trans from @stream. The media of @stream will
1663  * not be sent to the values configured in @trans.
1664  *
1665  * @stream must be joined to a bin.
1666  *
1667  * @trans must contain a valid #GstRTSPTransport.
1668  *
1669  * Returns: %TRUE if @trans was removed
1670  */
1671 gboolean
1672 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1673     GstRTSPStreamTransport * trans)
1674 {
1675   GstRTSPStreamPrivate *priv;
1676   gboolean res;
1677
1678   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1679   priv = stream->priv;
1680   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1681   g_return_val_if_fail (priv->is_joined, FALSE);
1682
1683   g_mutex_lock (&priv->lock);
1684   res = update_transport (stream, trans, FALSE);
1685   g_mutex_unlock (&priv->lock);
1686
1687   return res;
1688 }