dscp qos support in gst-rtsp-stream
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
31      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
32
33 struct _GstRTSPStreamPrivate
34 {
35   GMutex lock;
36   guint idx;
37   GstPad *srcpad;
38   GstElement *payloader;
39   guint buffer_size;
40   gboolean is_joined;
41
42   /* pads on the rtpbin */
43   GstPad *send_rtp_sink;
44   GstPad *recv_sink[2];
45   GstPad *send_src[2];
46
47   /* the RTPSession object */
48   GObject *session;
49
50   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
51    * sockets */
52   GstElement *udpsrc_v4[2];
53
54   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
55    * sockets */
56   GstElement *udpsrc_v6[2];
57
58   GstElement *udpsink[2];
59
60   /* for TCP transport */
61   GstElement *appsrc[2];
62   GstElement *appqueue[2];
63   GstElement *appsink[2];
64
65   GstElement *tee[2];
66   GstElement *funnel[2];
67
68   /* server ports for sending/receiving over ipv4 */
69   GstRTSPRange server_port_v4;
70   GstRTSPAddress *server_addr_v4;
71
72   /* server ports for sending/receiving over ipv6 */
73   GstRTSPRange server_port_v6;
74   GstRTSPAddress *server_addr_v6;
75
76   /* multicast addresses */
77   GstRTSPAddressPool *pool;
78   GstRTSPAddress *addr;
79
80   /* the caps of the stream */
81   gulong caps_sig;
82   GstCaps *caps;
83
84   /* transports we stream to */
85   guint n_active;
86   GList *transports;
87
88   gint dscp_qos;
89 };
90
91
92 enum
93 {
94   PROP_0,
95   PROP_LAST
96 };
97
98 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
99 #define GST_CAT_DEFAULT rtsp_stream_debug
100
101 static GQuark ssrc_stream_map_key;
102
103 static void gst_rtsp_stream_finalize (GObject * obj);
104
105 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
106
107 static void
108 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
109 {
110   GObjectClass *gobject_class;
111
112   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
113
114   gobject_class = G_OBJECT_CLASS (klass);
115
116   gobject_class->finalize = gst_rtsp_stream_finalize;
117
118   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
119
120   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
121 }
122
123 static void
124 gst_rtsp_stream_init (GstRTSPStream * stream)
125 {
126   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
127
128   GST_DEBUG ("new stream %p", stream);
129
130   stream->priv = priv;
131
132   stream->priv->dscp_qos = -1;
133
134   g_mutex_init (&priv->lock);
135 }
136
137 static void
138 gst_rtsp_stream_finalize (GObject * obj)
139 {
140   GstRTSPStream *stream;
141   GstRTSPStreamPrivate *priv;
142
143   stream = GST_RTSP_STREAM (obj);
144   priv = stream->priv;
145
146   GST_DEBUG ("finalize stream %p", stream);
147
148   /* we really need to be unjoined now */
149   g_return_if_fail (!priv->is_joined);
150
151   if (priv->addr)
152     gst_rtsp_address_free (priv->addr);
153   if (priv->server_addr_v4)
154     gst_rtsp_address_free (priv->server_addr_v4);
155   if (priv->server_addr_v6)
156     gst_rtsp_address_free (priv->server_addr_v6);
157   if (priv->pool)
158     g_object_unref (priv->pool);
159   gst_object_unref (priv->payloader);
160   gst_object_unref (priv->srcpad);
161   g_mutex_clear (&priv->lock);
162
163   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
164 }
165
166 /**
167  * gst_rtsp_stream_new:
168  * @idx: an index
169  * @srcpad: a #GstPad
170  * @payloader: a #GstElement
171  *
172  * Create a new media stream with index @idx that handles RTP data on
173  * @srcpad and has a payloader element @payloader.
174  *
175  * Returns: a new #GstRTSPStream
176  */
177 GstRTSPStream *
178 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
179 {
180   GstRTSPStreamPrivate *priv;
181   GstRTSPStream *stream;
182
183   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
184   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
185   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
186
187   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
188   priv = stream->priv;
189   priv->idx = idx;
190   priv->payloader = gst_object_ref (payloader);
191   priv->srcpad = gst_object_ref (srcpad);
192
193   return stream;
194 }
195
196 /**
197  * gst_rtsp_stream_get_index:
198  * @stream: a #GstRTSPStream
199  *
200  * Get the stream index.
201  *
202  * Return: the stream index.
203  */
204 guint
205 gst_rtsp_stream_get_index (GstRTSPStream * stream)
206 {
207   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
208
209   return stream->priv->idx;
210 }
211
212  /**
213  * gst_rtsp_stream_get_srcpad:
214  * @stream: a #GstRTSPStream
215  *
216  * Get the srcpad associated with @stream.
217  *
218  * Return: the srcpad. Unref after usage.
219  */
220 GstPad *
221 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
222 {
223   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
224
225   return gst_object_ref (stream->priv->srcpad);
226 }
227
228 /**
229  * gst_rtsp_stream_set_mtu:
230  * @stream: a #GstRTSPStream
231  * @mtu: a new MTU
232  *
233  * Configure the mtu in the payloader of @stream to @mtu.
234  */
235 void
236 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
237 {
238   GstRTSPStreamPrivate *priv;
239
240   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
241
242   priv = stream->priv;
243
244   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
245
246   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
247 }
248
249 /**
250  * gst_rtsp_stream_get_mtu:
251  * @stream: a #GstRTSPStream
252  *
253  * Get the configured MTU in the payloader of @stream.
254  *
255  * Returns: the MTU of the payloader.
256  */
257 guint
258 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
259 {
260   GstRTSPStreamPrivate *priv;
261   guint mtu;
262
263   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
264
265   priv = stream->priv;
266
267   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
268
269   return mtu;
270 }
271
272 /* Update the dscp qos property on the udp sinks */
273 static void
274 update_dscp_qos (GstRTSPStream *stream)
275 {
276   GstRTSPStreamPrivate *priv;
277
278   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
279
280   priv = stream->priv;
281
282   if (priv->udpsink[0]) {
283     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
284         NULL);
285   }
286
287   if (priv->udpsink[1]) {
288     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
289         NULL);
290   }
291 }
292
293 /**
294  * gst_rtsp_stream_set_dscp_qos:
295  * @stream: a #GstRTSPStream
296  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
297  *
298  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
299  */
300 void
301 gst_rtsp_stream_set_dscp_qos (GstRTSPStream *stream, gint dscp_qos)
302 {
303   GstRTSPStreamPrivate *priv;
304
305   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
306
307   priv = stream->priv;
308
309   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
310
311   if (dscp_qos < -1 || dscp_qos > 63) {
312     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
313     return;
314   }
315
316   priv->dscp_qos = dscp_qos;
317
318   update_dscp_qos (stream);
319 }
320
321 /**
322  * gst_rtsp_stream_get_dscp_qos:
323  * @stream: a #GstRTSPStream
324  *
325  * Get the configured DSCP QoS in of the outgoing sockets.
326  *
327  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
328  */
329 gint
330 gst_rtsp_stream_get_dscp_qos (GstRTSPStream *stream)
331 {
332   GstRTSPStreamPrivate *priv;
333
334   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
335
336   priv = stream->priv;
337
338   return priv->dscp_qos;
339 }
340
341
342 /**
343  * gst_rtsp_stream_set_address_pool:
344  * @stream: a #GstRTSPStream
345  * @pool: a #GstRTSPAddressPool
346  *
347  * configure @pool to be used as the address pool of @stream.
348  */
349 void
350 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
351     GstRTSPAddressPool * pool)
352 {
353   GstRTSPStreamPrivate *priv;
354   GstRTSPAddressPool *old;
355
356   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
357
358   priv = stream->priv;
359
360   GST_LOG_OBJECT (stream, "set address pool %p", pool);
361
362   g_mutex_lock (&priv->lock);
363   if ((old = priv->pool) != pool)
364     priv->pool = pool ? g_object_ref (pool) : NULL;
365   else
366     old = NULL;
367   g_mutex_unlock (&priv->lock);
368
369   if (old)
370     g_object_unref (old);
371 }
372
373 /**
374  * gst_rtsp_stream_get_address_pool:
375  * @stream: a #GstRTSPStream
376  *
377  * Get the #GstRTSPAddressPool used as the address pool of @stream.
378  *
379  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
380  * usage.
381  */
382 GstRTSPAddressPool *
383 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
384 {
385   GstRTSPStreamPrivate *priv;
386   GstRTSPAddressPool *result;
387
388   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
389
390   priv = stream->priv;
391
392   g_mutex_lock (&priv->lock);
393   if ((result = priv->pool))
394     g_object_ref (result);
395   g_mutex_unlock (&priv->lock);
396
397   return result;
398 }
399
400 /**
401  * gst_rtsp_stream_get_address:
402  * @stream: a #GstRTSPStream
403  *
404  * Get the multicast address of @stream.
405  *
406  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
407  * allocated. gst_rtsp_address_free() after usage.
408  */
409 GstRTSPAddress *
410 gst_rtsp_stream_get_address (GstRTSPStream * stream)
411 {
412   GstRTSPStreamPrivate *priv;
413   GstRTSPAddress *result;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
416
417   priv = stream->priv;
418
419   g_mutex_lock (&priv->lock);
420   if (priv->addr == NULL) {
421     if (priv->pool == NULL)
422       goto no_pool;
423
424     priv->addr = gst_rtsp_address_pool_acquire_address (priv->pool,
425         GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST, 2);
426     if (priv->addr == NULL)
427       goto no_address;
428   }
429   result = gst_rtsp_address_copy (priv->addr);
430   g_mutex_unlock (&priv->lock);
431
432   return result;
433
434   /* ERRORS */
435 no_pool:
436   {
437     GST_ERROR_OBJECT (stream, "no address pool specified");
438     g_mutex_unlock (&priv->lock);
439     return NULL;
440   }
441 no_address:
442   {
443     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
444     g_mutex_unlock (&priv->lock);
445     return NULL;
446   }
447 }
448
449 /**
450  * gst_rtsp_stream_reserve_address:
451  * @stream: a #GstRTSPStream
452  *
453  * Get a specific multicast address of @stream.
454  *
455  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
456  * allocated. gst_rtsp_address_free() after usage.
457  */
458 GstRTSPAddress *
459 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
460     const gchar * address, guint port, guint n_ports, guint ttl)
461 {
462   GstRTSPStreamPrivate *priv;
463   GstRTSPAddress *result;
464
465   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
466   g_return_val_if_fail (address != NULL, NULL);
467   g_return_val_if_fail (port > 0, NULL);
468   g_return_val_if_fail (n_ports > 0, NULL);
469   g_return_val_if_fail (ttl > 0, NULL);
470
471   priv = stream->priv;
472
473   g_mutex_lock (&priv->lock);
474   if (priv->addr == NULL) {
475     if (priv->pool == NULL)
476       goto no_pool;
477
478     priv->addr = gst_rtsp_address_pool_reserve_address (priv->pool, address,
479         port, n_ports, ttl);
480     if (priv->addr == NULL)
481       goto no_address;
482   } else {
483     if (strcmp (priv->addr->address, address) ||
484         priv->addr->port != port || priv->addr->n_ports != n_ports ||
485         priv->addr->ttl != ttl)
486       goto different_address;
487   }
488   result = gst_rtsp_address_copy (priv->addr);
489   g_mutex_unlock (&priv->lock);
490
491   return result;
492
493   /* ERRORS */
494 no_pool:
495   {
496     GST_ERROR_OBJECT (stream, "no address pool specified");
497     g_mutex_unlock (&priv->lock);
498     return NULL;
499   }
500 no_address:
501   {
502     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
503         address);
504     g_mutex_unlock (&priv->lock);
505     return NULL;
506   }
507 different_address:
508   {
509     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
510         " reserved", address);
511     g_mutex_unlock (&priv->lock);
512     return NULL;
513   }
514 }
515
516 static gboolean
517 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
518     GSocketFamily family, GstElement * udpsrc_out[2],
519     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
520     GstRTSPAddress ** server_addr_out)
521 {
522   GstStateChangeReturn ret;
523   GstElement *udpsrc0, *udpsrc1;
524   GstElement *udpsink0, *udpsink1;
525   GSocket *rtp_socket = NULL;
526   GSocket *rtcp_socket;
527   gint tmp_rtp, tmp_rtcp;
528   guint count;
529   gint rtpport, rtcpport;
530   GList *rejected_addresses = NULL;
531   GstRTSPAddress *addr = NULL;
532   GInetAddress *inetaddr = NULL;
533   GSocketAddress *rtp_sockaddr = NULL;
534   GSocketAddress *rtcp_sockaddr = NULL;
535   const gchar *multisink_socket = "socket";
536
537   if (family == G_SOCKET_FAMILY_IPV6) {
538     multisink_socket = "socket-v6";
539   }
540
541   udpsrc0 = NULL;
542   udpsrc1 = NULL;
543   udpsink0 = NULL;
544   udpsink1 = NULL;
545   count = 0;
546
547   /* Start with random port */
548   tmp_rtp = 0;
549
550   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
551       G_SOCKET_PROTOCOL_UDP, NULL);
552   if (!rtcp_socket)
553     goto no_udp_protocol;
554
555   if (*server_addr_out)
556     gst_rtsp_address_free (*server_addr_out);
557
558   /* try to allocate 2 UDP ports, the RTP port should be an even
559    * number and the RTCP port should be the next (uneven) port */
560 again:
561
562   if (rtp_socket == NULL) {
563     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
564         G_SOCKET_PROTOCOL_UDP, NULL);
565     if (!rtp_socket)
566       goto no_udp_protocol;
567   }
568
569   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
570     GstRTSPAddressFlags flags;
571
572     if (addr)
573       rejected_addresses = g_list_prepend (rejected_addresses, addr);
574
575     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
576     if (family == G_SOCKET_FAMILY_IPV6)
577       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
578     else
579       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
580
581     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
582
583     if (addr == NULL)
584       goto no_ports;
585
586     tmp_rtp = addr->port;
587
588     g_clear_object (&inetaddr);
589     inetaddr = g_inet_address_new_from_string (addr->address);
590   } else {
591     if (tmp_rtp != 0) {
592       tmp_rtp += 2;
593       if (++count > 20)
594         goto no_ports;
595     }
596
597     if (inetaddr == NULL)
598       inetaddr = g_inet_address_new_any (family);
599   }
600
601   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
602   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
603     g_object_unref (rtp_sockaddr);
604     goto again;
605   }
606   g_object_unref (rtp_sockaddr);
607
608   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
609   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
610     g_clear_object (&rtp_sockaddr);
611     goto socket_error;
612   }
613
614   tmp_rtp =
615       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
616   g_object_unref (rtp_sockaddr);
617
618   /* check if port is even */
619   if ((tmp_rtp & 1) != 0) {
620     /* port not even, close and allocate another */
621     tmp_rtp++;
622     g_clear_object (&rtp_socket);
623     goto again;
624   }
625
626   /* set port */
627   tmp_rtcp = tmp_rtp + 1;
628
629   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
630   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
631     g_object_unref (rtcp_sockaddr);
632     g_clear_object (&rtp_socket);
633     goto again;
634   }
635   g_object_unref (rtcp_sockaddr);
636
637   g_clear_object (&inetaddr);
638
639   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
640   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
641
642   if (udpsrc0 == NULL || udpsrc1 == NULL)
643     goto no_udp_protocol;
644
645   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
646   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
647
648   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
649   if (ret == GST_STATE_CHANGE_FAILURE)
650     goto element_error;
651   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
652   if (ret == GST_STATE_CHANGE_FAILURE)
653     goto element_error;
654
655   /* all fine, do port check */
656   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
657   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
658
659   /* this should not happen... */
660   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
661     goto port_error;
662
663   if (udpsink_out[0])
664     udpsink0 = udpsink_out[0];
665   else
666     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
667
668   if (!udpsink0)
669     goto no_udp_protocol;
670
671   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
672   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
673
674   if (udpsink_out[1])
675     udpsink1 = udpsink_out[1];
676   else
677     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
678
679   if (!udpsink1)
680     goto no_udp_protocol;
681
682   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
683   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
684   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
685
686   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
687   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
688   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
689   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
690   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
691   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
692   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
693   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
694
695   /* we keep these elements, we will further configure them when the
696    * client told us to really use the UDP ports. */
697   udpsrc_out[0] = udpsrc0;
698   udpsrc_out[1] = udpsrc1;
699   udpsink_out[0] = udpsink0;
700   udpsink_out[1] = udpsink1;
701   server_port_out->min = rtpport;
702   server_port_out->max = rtcpport;
703
704   *server_addr_out = addr;
705   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
706
707   g_object_unref (rtp_socket);
708   g_object_unref (rtcp_socket);
709
710   return TRUE;
711
712   /* ERRORS */
713 no_udp_protocol:
714   {
715     goto cleanup;
716   }
717 no_ports:
718   {
719     goto cleanup;
720   }
721 port_error:
722   {
723     goto cleanup;
724   }
725 socket_error:
726   {
727     goto cleanup;
728   }
729 element_error:
730   {
731     goto cleanup;
732   }
733 cleanup:
734   {
735     if (udpsrc0) {
736       gst_element_set_state (udpsrc0, GST_STATE_NULL);
737       gst_object_unref (udpsrc0);
738     }
739     if (udpsrc1) {
740       gst_element_set_state (udpsrc1, GST_STATE_NULL);
741       gst_object_unref (udpsrc1);
742     }
743     if (udpsink0) {
744       gst_element_set_state (udpsink0, GST_STATE_NULL);
745       gst_object_unref (udpsink0);
746     }
747     if (udpsink1) {
748       gst_element_set_state (udpsink1, GST_STATE_NULL);
749       gst_object_unref (udpsink1);
750     }
751     if (inetaddr)
752       g_object_unref (inetaddr);
753     g_list_free_full (rejected_addresses,
754         (GDestroyNotify) gst_rtsp_address_free);
755     if (addr)
756       gst_rtsp_address_free (addr);
757     if (rtp_socket)
758       g_object_unref (rtp_socket);
759     if (rtcp_socket)
760       g_object_unref (rtcp_socket);
761     return FALSE;
762   }
763 }
764
765 /* must be called with lock */
766 static gboolean
767 alloc_ports (GstRTSPStream * stream)
768 {
769   GstRTSPStreamPrivate *priv = stream->priv;
770
771   return alloc_ports_one_family (priv->pool, priv->buffer_size,
772       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
773       &priv->server_port_v4, &priv->server_addr_v4) &&
774       alloc_ports_one_family (priv->pool, priv->buffer_size,
775       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
776       &priv->server_port_v6, &priv->server_addr_v6);
777 }
778
779 /**
780  * gst_rtsp_stream_get_server_port:
781  * @stream: a #GstRTSPStream
782  * @server_port: (out): result server port
783  *
784  * Fill @server_port with the port pair used by the server. This function can
785  * only be called when @stream has been joined.
786  */
787 void
788 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
789     GstRTSPRange * server_port, GSocketFamily family)
790 {
791   GstRTSPStreamPrivate *priv;
792
793   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
794   priv = stream->priv;
795   g_return_if_fail (priv->is_joined);
796
797   g_mutex_lock (&priv->lock);
798   if (family == G_SOCKET_FAMILY_IPV4) {
799     if (server_port)
800       *server_port = priv->server_port_v4;
801   } else {
802     if (server_port)
803       *server_port = priv->server_port_v6;
804   }
805   g_mutex_unlock (&priv->lock);
806 }
807
808 /**
809  * gst_rtsp_stream_get_ssrc:
810  * @stream: a #GstRTSPStream
811  * @ssrc: (out): result ssrc
812  *
813  * Get the SSRC used by the RTP session of this stream. This function can only
814  * be called when @stream has been joined.
815  */
816 void
817 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
818 {
819   GstRTSPStreamPrivate *priv;
820
821   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
822   priv = stream->priv;
823   g_return_if_fail (priv->is_joined);
824
825   g_mutex_lock (&priv->lock);
826   if (ssrc && priv->session)
827     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
828   g_mutex_unlock (&priv->lock);
829 }
830
831 /* executed from streaming thread */
832 static void
833 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
834 {
835   GstRTSPStreamPrivate *priv = stream->priv;
836   GstCaps *newcaps, *oldcaps;
837
838   newcaps = gst_pad_get_current_caps (pad);
839
840   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
841       newcaps);
842
843   g_mutex_lock (&priv->lock);
844   oldcaps = priv->caps;
845   priv->caps = newcaps;
846   g_mutex_unlock (&priv->lock);
847
848   if (oldcaps)
849     gst_caps_unref (oldcaps);
850 }
851
852 static void
853 dump_structure (const GstStructure * s)
854 {
855   gchar *sstr;
856
857   sstr = gst_structure_to_string (s);
858   GST_INFO ("structure: %s", sstr);
859   g_free (sstr);
860 }
861
862 static GstRTSPStreamTransport *
863 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
864 {
865   GstRTSPStreamPrivate *priv = stream->priv;
866   GList *walk;
867   GstRTSPStreamTransport *result = NULL;
868   const gchar *tmp;
869   gchar *dest;
870   guint port;
871
872   if (rtcp_from == NULL)
873     return NULL;
874
875   tmp = g_strrstr (rtcp_from, ":");
876   if (tmp == NULL)
877     return NULL;
878
879   port = atoi (tmp + 1);
880   dest = g_strndup (rtcp_from, tmp - rtcp_from);
881
882   g_mutex_lock (&priv->lock);
883   GST_INFO ("finding %s:%d in %d transports", dest, port,
884       g_list_length (priv->transports));
885
886   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
887     GstRTSPStreamTransport *trans = walk->data;
888     const GstRTSPTransport *tr;
889     gint min, max;
890
891     tr = gst_rtsp_stream_transport_get_transport (trans);
892
893     min = tr->client_port.min;
894     max = tr->client_port.max;
895
896     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
897       result = trans;
898       break;
899     }
900   }
901   if (result)
902     g_object_ref (result);
903   g_mutex_unlock (&priv->lock);
904
905   g_free (dest);
906
907   return result;
908 }
909
910 static GstRTSPStreamTransport *
911 check_transport (GObject * source, GstRTSPStream * stream)
912 {
913   GstStructure *stats;
914   GstRTSPStreamTransport *trans;
915
916   /* see if we have a stream to match with the origin of the RTCP packet */
917   trans = g_object_get_qdata (source, ssrc_stream_map_key);
918   if (trans == NULL) {
919     g_object_get (source, "stats", &stats, NULL);
920     if (stats) {
921       const gchar *rtcp_from;
922
923       dump_structure (stats);
924
925       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
926       if ((trans = find_transport (stream, rtcp_from))) {
927         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
928             source);
929         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
930             g_object_unref);
931       }
932       gst_structure_free (stats);
933     }
934   }
935   return trans;
936 }
937
938
939 static void
940 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
941 {
942   GstRTSPStreamTransport *trans;
943
944   GST_INFO ("%p: new source %p", stream, source);
945
946   trans = check_transport (source, stream);
947
948   if (trans)
949     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
950 }
951
952 static void
953 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
954 {
955   GST_INFO ("%p: new SDES %p", stream, source);
956 }
957
958 static void
959 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
960 {
961   GstRTSPStreamTransport *trans;
962
963   trans = check_transport (source, stream);
964
965   if (trans) {
966     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
967     gst_rtsp_stream_transport_keep_alive (trans);
968   }
969 #ifdef DUMP_STATS
970   {
971     GstStructure *stats;
972     g_object_get (source, "stats", &stats, NULL);
973     if (stats) {
974       dump_structure (stats);
975       gst_structure_free (stats);
976     }
977   }
978 #endif
979 }
980
981 static void
982 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
983 {
984   GST_INFO ("%p: source %p bye", stream, source);
985 }
986
987 static void
988 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
989 {
990   GstRTSPStreamTransport *trans;
991
992   GST_INFO ("%p: source %p bye timeout", stream, source);
993
994   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
995     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
996     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
997   }
998 }
999
1000 static void
1001 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1002 {
1003   GstRTSPStreamTransport *trans;
1004
1005   GST_INFO ("%p: source %p timeout", stream, source);
1006
1007   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1008     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1009     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1010   }
1011 }
1012
1013 static GstFlowReturn
1014 handle_new_sample (GstAppSink * sink, gpointer user_data)
1015 {
1016   GstRTSPStreamPrivate *priv;
1017   GList *walk;
1018   GstSample *sample;
1019   GstBuffer *buffer;
1020   GstRTSPStream *stream;
1021
1022   sample = gst_app_sink_pull_sample (sink);
1023   if (!sample)
1024     return GST_FLOW_OK;
1025
1026   stream = (GstRTSPStream *) user_data;
1027   priv = stream->priv;
1028   buffer = gst_sample_get_buffer (sample);
1029
1030   g_mutex_lock (&priv->lock);
1031   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1032     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1033
1034     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1035       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1036     } else {
1037       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1038     }
1039   }
1040   g_mutex_unlock (&priv->lock);
1041
1042   gst_sample_unref (sample);
1043
1044   return GST_FLOW_OK;
1045 }
1046
1047 static GstAppSinkCallbacks sink_cb = {
1048   NULL,                         /* not interested in EOS */
1049   NULL,                         /* not interested in preroll samples */
1050   handle_new_sample,
1051 };
1052
1053 /**
1054  * gst_rtsp_stream_join_bin:
1055  * @stream: a #GstRTSPStream
1056  * @bin: a #GstBin to join
1057  * @rtpbin: a rtpbin element in @bin
1058  * @state: the target state of the new elements
1059  *
1060  * Join the #Gstbin @bin that contains the element @rtpbin.
1061  *
1062  * @stream will link to @rtpbin, which must be inside @bin. The elements
1063  * added to @bin will be set to the state given in @state.
1064  *
1065  * Returns: %TRUE on success.
1066  */
1067 gboolean
1068 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1069     GstElement * rtpbin, GstState state)
1070 {
1071   GstRTSPStreamPrivate *priv;
1072   gint i, idx;
1073   gchar *name;
1074   GstPad *pad, *teepad, *queuepad, *selpad;
1075   GstPadLinkReturn ret;
1076
1077   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1078   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1079   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1080
1081   priv = stream->priv;
1082
1083   g_mutex_lock (&priv->lock);
1084   if (priv->is_joined)
1085     goto was_joined;
1086
1087   /* create a session with the same index as the stream */
1088   idx = priv->idx;
1089
1090   GST_INFO ("stream %p joining bin as session %d", stream, idx);
1091
1092   if (!alloc_ports (stream))
1093     goto no_ports;
1094
1095   /* update the dscp qos field in the sinks */
1096   update_dscp_qos (stream);
1097
1098   /* get a pad for sending RTP */
1099   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1100   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1101   g_free (name);
1102   /* link the RTP pad to the session manager, it should not really fail unless
1103    * this is not really an RTP pad */
1104   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1105   if (ret != GST_PAD_LINK_OK)
1106     goto link_failed;
1107
1108   /* get pads from the RTP session element for sending and receiving
1109    * RTP/RTCP*/
1110   name = g_strdup_printf ("send_rtp_src_%u", idx);
1111   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1112   g_free (name);
1113   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1114   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1115   g_free (name);
1116   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1117   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1118   g_free (name);
1119   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1120   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1121   g_free (name);
1122
1123   /* get the session */
1124   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1125
1126   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1127       stream);
1128   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1129       stream);
1130   g_signal_connect (priv->session, "on-ssrc-active",
1131       (GCallback) on_ssrc_active, stream);
1132   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1133       stream);
1134   g_signal_connect (priv->session, "on-bye-timeout",
1135       (GCallback) on_bye_timeout, stream);
1136   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1137       stream);
1138
1139   for (i = 0; i < 2; i++) {
1140     /* For the sender we create this bit of pipeline for both
1141      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1142      * we need to add a queue before appsink to make the pipeline
1143      * not block. For the TCP case, we want to pump data to the
1144      * client as fast as possible anyway.
1145      *
1146      * .--------.      .-----.    .---------.
1147      * | rtpbin |      | tee |    | udpsink |
1148      * |       send->sink   src->sink       |
1149      * '--------'      |     |    '---------'
1150      *                 |     |    .---------.    .---------.
1151      *                 |     |    |  queue  |    | appsink |
1152      *                 |    src->sink      src->sink       |
1153      *                 '-----'    '---------'    '---------'
1154      */
1155     /* make tee for RTP/RTCP */
1156     priv->tee[i] = gst_element_factory_make ("tee", NULL);
1157     gst_bin_add (bin, priv->tee[i]);
1158
1159     /* and link to rtpbin send pad */
1160     pad = gst_element_get_static_pad (priv->tee[i], "sink");
1161     gst_pad_link (priv->send_src[i], pad);
1162     gst_object_unref (pad);
1163
1164     /* add udpsink */
1165     gst_bin_add (bin, priv->udpsink[i]);
1166
1167     /* link tee to udpsink */
1168     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1169     pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1170     gst_pad_link (teepad, pad);
1171     gst_object_unref (pad);
1172     gst_object_unref (teepad);
1173
1174     /* make queue */
1175     priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1176     gst_bin_add (bin, priv->appqueue[i]);
1177     /* and link to tee */
1178     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1179     pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1180     gst_pad_link (teepad, pad);
1181     gst_object_unref (pad);
1182     gst_object_unref (teepad);
1183
1184     /* make appsink */
1185     priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1186     g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1187     g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1188     gst_bin_add (bin, priv->appsink[i]);
1189     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1190         &sink_cb, stream, NULL);
1191     /* and link to queue */
1192     queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1193     pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1194     gst_pad_link (queuepad, pad);
1195     gst_object_unref (pad);
1196     gst_object_unref (queuepad);
1197
1198     /* For the receiver we create this bit of pipeline for both
1199      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1200      * and it is all funneled into the rtpbin receive pad.
1201      *
1202      * .--------.     .--------.    .--------.
1203      * | udpsrc |     | funnel |    | rtpbin |
1204      * |       src->sink      src->sink      |
1205      * '--------'     |        |    '--------'
1206      * .--------.     |        |
1207      * | appsrc |     |        |
1208      * |       src->sink       |
1209      * '--------'     '--------'
1210      */
1211     /* make funnel for the RTP/RTCP receivers */
1212     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1213     gst_bin_add (bin, priv->funnel[i]);
1214
1215     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1216     gst_pad_link (pad, priv->recv_sink[i]);
1217     gst_object_unref (pad);
1218
1219     /* we set and keep these to playing so that they don't cause NO_PREROLL return
1220      * values */
1221     gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1222     gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1223     gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1224     gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1225     /* add udpsrc */
1226     gst_bin_add (bin, priv->udpsrc_v4[i]);
1227     gst_bin_add (bin, priv->udpsrc_v6[i]);
1228     /* and link to the funnel v4 */
1229     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1230     pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1231     gst_pad_link (pad, selpad);
1232     gst_object_unref (pad);
1233     gst_object_unref (selpad);
1234
1235     /* and link to the funnel v6 */
1236     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1237     pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1238     gst_pad_link (pad, selpad);
1239     gst_object_unref (pad);
1240     gst_object_unref (selpad);
1241
1242     /* make and add appsrc */
1243     priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1244     gst_bin_add (bin, priv->appsrc[i]);
1245     /* and link to the funnel */
1246     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1247     pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1248     gst_pad_link (pad, selpad);
1249     gst_object_unref (pad);
1250     gst_object_unref (selpad);
1251
1252     /* check if we need to set to a special state */
1253     if (state != GST_STATE_NULL) {
1254       gst_element_set_state (priv->udpsink[i], state);
1255       gst_element_set_state (priv->appsink[i], state);
1256       gst_element_set_state (priv->appqueue[i], state);
1257       gst_element_set_state (priv->tee[i], state);
1258       gst_element_set_state (priv->funnel[i], state);
1259       gst_element_set_state (priv->appsrc[i], state);
1260     }
1261   }
1262
1263   /* be notified of caps changes */
1264   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1265       (GCallback) caps_notify, stream);
1266
1267   priv->is_joined = TRUE;
1268   g_mutex_unlock (&priv->lock);
1269
1270   return TRUE;
1271
1272   /* ERRORS */
1273 was_joined:
1274   {
1275     g_mutex_unlock (&priv->lock);
1276     return TRUE;
1277   }
1278 no_ports:
1279   {
1280     g_mutex_unlock (&priv->lock);
1281     GST_WARNING ("failed to allocate ports %d", idx);
1282     return FALSE;
1283   }
1284 link_failed:
1285   {
1286     GST_WARNING ("failed to link stream %d", idx);
1287     gst_object_unref (priv->send_rtp_sink);
1288     priv->send_rtp_sink = NULL;
1289     g_mutex_unlock (&priv->lock);
1290     return FALSE;
1291   }
1292 }
1293
1294 /**
1295  * gst_rtsp_stream_leave_bin:
1296  * @stream: a #GstRTSPStream
1297  * @bin: a #GstBin
1298  * @rtpbin: a rtpbin #GstElement
1299  *
1300  * Remove the elements of @stream from @bin.
1301  *
1302  * Return: %TRUE on success.
1303  */
1304 gboolean
1305 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1306     GstElement * rtpbin)
1307 {
1308   GstRTSPStreamPrivate *priv;
1309   gint i;
1310
1311   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1312   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1313   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1314
1315   priv = stream->priv;
1316
1317   g_mutex_lock (&priv->lock);
1318   if (!priv->is_joined)
1319     goto was_not_joined;
1320
1321   /* all transports must be removed by now */
1322   g_return_val_if_fail (priv->transports == NULL, FALSE);
1323
1324   GST_INFO ("stream %p leaving bin", stream);
1325
1326   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1327   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1328   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1329   gst_object_unref (priv->send_rtp_sink);
1330   priv->send_rtp_sink = NULL;
1331
1332   for (i = 0; i < 2; i++) {
1333     gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1334     gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1335     gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1336     gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1337     gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1338     gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1339     /* and set udpsrc to NULL now before removing */
1340     gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1341     gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1342     gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1343     gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1344
1345     /* removing them should also nicely release the request
1346      * pads when they finalize */
1347     gst_bin_remove (bin, priv->udpsrc_v4[i]);
1348     gst_bin_remove (bin, priv->udpsrc_v6[i]);
1349     gst_bin_remove (bin, priv->udpsink[i]);
1350     gst_bin_remove (bin, priv->appsrc[i]);
1351     gst_bin_remove (bin, priv->appsink[i]);
1352     gst_bin_remove (bin, priv->appqueue[i]);
1353     gst_bin_remove (bin, priv->tee[i]);
1354     gst_bin_remove (bin, priv->funnel[i]);
1355
1356     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1357     gst_object_unref (priv->recv_sink[i]);
1358     priv->recv_sink[i] = NULL;
1359
1360     priv->udpsrc_v4[i] = NULL;
1361     priv->udpsrc_v6[i] = NULL;
1362     priv->udpsink[i] = NULL;
1363     priv->appsrc[i] = NULL;
1364     priv->appsink[i] = NULL;
1365     priv->appqueue[i] = NULL;
1366     priv->tee[i] = NULL;
1367     priv->funnel[i] = NULL;
1368   }
1369   gst_object_unref (priv->send_src[0]);
1370   priv->send_src[0] = NULL;
1371
1372   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1373   gst_object_unref (priv->send_src[1]);
1374   priv->send_src[1] = NULL;
1375
1376   g_object_unref (priv->session);
1377   priv->session = NULL;
1378   if (priv->caps)
1379     gst_caps_unref (priv->caps);
1380   priv->caps = NULL;
1381
1382   priv->is_joined = FALSE;
1383   g_mutex_unlock (&priv->lock);
1384
1385   return TRUE;
1386
1387 was_not_joined:
1388   {
1389     return TRUE;
1390   }
1391 }
1392
1393 /**
1394  * gst_rtsp_stream_get_rtpinfo:
1395  * @stream: a #GstRTSPStream
1396  * @rtptime: result RTP timestamp
1397  * @seq: result RTP seqnum
1398  *
1399  * Retrieve the current rtptime and seq. This is used to
1400  * construct a RTPInfo reply header.
1401  *
1402  * Returns: %TRUE when rtptime and seq could be determined.
1403  */
1404 gboolean
1405 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1406     guint * rtptime, guint * seq)
1407 {
1408   GstRTSPStreamPrivate *priv;
1409   GObjectClass *payobjclass;
1410
1411   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1412   g_return_val_if_fail (rtptime != NULL, FALSE);
1413   g_return_val_if_fail (seq != NULL, FALSE);
1414
1415   priv = stream->priv;
1416
1417   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1418
1419   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1420       !g_object_class_find_property (payobjclass, "timestamp"))
1421     return FALSE;
1422
1423   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1424
1425   return TRUE;
1426 }
1427
1428 /**
1429  * gst_rtsp_stream_get_caps:
1430  * @stream: a #GstRTSPStream
1431  *
1432  * Retrieve the current caps of @stream.
1433  *
1434  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1435  *    after usage.
1436  */
1437 GstCaps *
1438 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1439 {
1440   GstRTSPStreamPrivate *priv;
1441   GstCaps *result;
1442
1443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1444
1445   priv = stream->priv;
1446
1447   g_mutex_lock (&priv->lock);
1448   if ((result = priv->caps))
1449     gst_caps_ref (result);
1450   g_mutex_unlock (&priv->lock);
1451
1452   return result;
1453 }
1454
1455 /**
1456  * gst_rtsp_stream_recv_rtp:
1457  * @stream: a #GstRTSPStream
1458  * @buffer: (transfer full): a #GstBuffer
1459  *
1460  * Handle an RTP buffer for the stream. This method is usually called when a
1461  * message has been received from a client using the TCP transport.
1462  *
1463  * This function takes ownership of @buffer.
1464  *
1465  * Returns: a GstFlowReturn.
1466  */
1467 GstFlowReturn
1468 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1469 {
1470   GstRTSPStreamPrivate *priv;
1471   GstFlowReturn ret;
1472   GstElement *element;
1473
1474   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1475   priv = stream->priv;
1476   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1477   g_return_val_if_fail (priv->is_joined, FALSE);
1478
1479   g_mutex_lock (&priv->lock);
1480   element = gst_object_ref (priv->appsrc[0]);
1481   g_mutex_unlock (&priv->lock);
1482
1483   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1484
1485   gst_object_unref (element);
1486
1487   return ret;
1488 }
1489
1490 /**
1491  * gst_rtsp_stream_recv_rtcp:
1492  * @stream: a #GstRTSPStream
1493  * @buffer: (transfer full): a #GstBuffer
1494  *
1495  * Handle an RTCP buffer for the stream. This method is usually called when a
1496  * message has been received from a client using the TCP transport.
1497  *
1498  * This function takes ownership of @buffer.
1499  *
1500  * Returns: a GstFlowReturn.
1501  */
1502 GstFlowReturn
1503 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1504 {
1505   GstRTSPStreamPrivate *priv;
1506   GstFlowReturn ret;
1507   GstElement *element;
1508
1509   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1510   priv = stream->priv;
1511   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1512   g_return_val_if_fail (priv->is_joined, FALSE);
1513
1514   g_mutex_lock (&priv->lock);
1515   element = gst_object_ref (priv->appsrc[1]);
1516   g_mutex_unlock (&priv->lock);
1517
1518   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1519
1520   gst_object_unref (element);
1521
1522   return ret;
1523 }
1524
1525 /* must be called with lock */
1526 static gboolean
1527 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1528     gboolean add)
1529 {
1530   GstRTSPStreamPrivate *priv = stream->priv;
1531   const GstRTSPTransport *tr;
1532
1533   tr = gst_rtsp_stream_transport_get_transport (trans);
1534
1535   switch (tr->lower_transport) {
1536     case GST_RTSP_LOWER_TRANS_UDP:
1537     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1538     {
1539       gchar *dest;
1540       gint min, max;
1541       guint ttl = 0;
1542
1543       dest = tr->destination;
1544       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1545         min = tr->port.min;
1546         max = tr->port.max;
1547         ttl = tr->ttl;
1548       } else {
1549         min = tr->client_port.min;
1550         max = tr->client_port.max;
1551       }
1552
1553       if (add) {
1554         GST_INFO ("adding %s:%d-%d", dest, min, max);
1555         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1556         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1557         if (ttl > 0) {
1558           GST_INFO ("setting ttl-mc %d", ttl);
1559           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1560           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1561         }
1562         priv->transports = g_list_prepend (priv->transports, trans);
1563       } else {
1564         GST_INFO ("removing %s:%d-%d", dest, min, max);
1565         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1566         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1567         priv->transports = g_list_remove (priv->transports, trans);
1568       }
1569       break;
1570     }
1571     case GST_RTSP_LOWER_TRANS_TCP:
1572       if (add) {
1573         GST_INFO ("adding TCP %s", tr->destination);
1574         priv->transports = g_list_prepend (priv->transports, trans);
1575       } else {
1576         GST_INFO ("removing TCP %s", tr->destination);
1577         priv->transports = g_list_remove (priv->transports, trans);
1578       }
1579       break;
1580     default:
1581       goto unknown_transport;
1582   }
1583   return TRUE;
1584
1585   /* ERRORS */
1586 unknown_transport:
1587   {
1588     GST_INFO ("Unknown transport %d", tr->lower_transport);
1589     return FALSE;
1590   }
1591 }
1592
1593
1594 /**
1595  * gst_rtsp_stream_add_transport:
1596  * @stream: a #GstRTSPStream
1597  * @trans: a #GstRTSPStreamTransport
1598  *
1599  * Add the transport in @trans to @stream. The media of @stream will
1600  * then also be send to the values configured in @trans.
1601  *
1602  * @stream must be joined to a bin.
1603  *
1604  * @trans must contain a valid #GstRTSPTransport.
1605  *
1606  * Returns: %TRUE if @trans was added
1607  */
1608 gboolean
1609 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1610     GstRTSPStreamTransport * trans)
1611 {
1612   GstRTSPStreamPrivate *priv;
1613   gboolean res;
1614
1615   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1616   priv = stream->priv;
1617   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1618   g_return_val_if_fail (priv->is_joined, FALSE);
1619
1620   g_mutex_lock (&priv->lock);
1621   res = update_transport (stream, trans, TRUE);
1622   g_mutex_unlock (&priv->lock);
1623
1624   return res;
1625 }
1626
1627 /**
1628  * gst_rtsp_stream_remove_transport:
1629  * @stream: a #GstRTSPStream
1630  * @trans: a #GstRTSPStreamTransport
1631  *
1632  * Remove the transport in @trans from @stream. The media of @stream will
1633  * not be sent to the values configured in @trans.
1634  *
1635  * @stream must be joined to a bin.
1636  *
1637  * @trans must contain a valid #GstRTSPTransport.
1638  *
1639  * Returns: %TRUE if @trans was removed
1640  */
1641 gboolean
1642 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1643     GstRTSPStreamTransport * trans)
1644 {
1645   GstRTSPStreamPrivate *priv;
1646   gboolean res;
1647
1648   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1649   priv = stream->priv;
1650   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1651   g_return_val_if_fail (priv->is_joined, FALSE);
1652
1653   g_mutex_lock (&priv->lock);
1654   res = update_transport (stream, trans, FALSE);
1655   g_mutex_unlock (&priv->lock);
1656
1657   return res;
1658 }