stream: add methods and property to set control string
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
31      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
32
33 struct _GstRTSPStreamPrivate
34 {
35   GMutex lock;
36   guint idx;
37   GstPad *srcpad;
38   GstElement *payloader;
39   guint buffer_size;
40   gboolean is_joined;
41   gchar *control;
42
43   /* pads on the rtpbin */
44   GstPad *send_rtp_sink;
45   GstPad *recv_sink[2];
46   GstPad *send_src[2];
47
48   /* the RTPSession object */
49   GObject *session;
50
51   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
52    * sockets */
53   GstElement *udpsrc_v4[2];
54
55   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
56    * sockets */
57   GstElement *udpsrc_v6[2];
58
59   GstElement *udpsink[2];
60
61   /* for TCP transport */
62   GstElement *appsrc[2];
63   GstElement *appqueue[2];
64   GstElement *appsink[2];
65
66   GstElement *tee[2];
67   GstElement *funnel[2];
68
69   /* server ports for sending/receiving over ipv4 */
70   GstRTSPRange server_port_v4;
71   GstRTSPAddress *server_addr_v4;
72   gboolean have_ipv4;
73
74   /* server ports for sending/receiving over ipv6 */
75   GstRTSPRange server_port_v6;
76   GstRTSPAddress *server_addr_v6;
77   gboolean have_ipv6;
78
79   /* multicast addresses */
80   GstRTSPAddressPool *pool;
81   GstRTSPAddress *addr_v4;
82   GstRTSPAddress *addr_v6;
83
84   /* the caps of the stream */
85   gulong caps_sig;
86   GstCaps *caps;
87
88   /* transports we stream to */
89   guint n_active;
90   GList *transports;
91
92   gint dscp_qos;
93 };
94
95 #define DEFAULT_CONTROL NULL
96
97 enum
98 {
99   PROP_0,
100   PROP_CONTROL,
101   PROP_LAST
102 };
103
104 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
105 #define GST_CAT_DEFAULT rtsp_stream_debug
106
107 static GQuark ssrc_stream_map_key;
108
109 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
110     GValue * value, GParamSpec * pspec);
111 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
112     const GValue * value, GParamSpec * pspec);
113
114 static void gst_rtsp_stream_finalize (GObject * obj);
115
116 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
117
118 static void
119 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
120 {
121   GObjectClass *gobject_class;
122
123   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
124
125   gobject_class = G_OBJECT_CLASS (klass);
126
127   gobject_class->get_property = gst_rtsp_stream_get_property;
128   gobject_class->set_property = gst_rtsp_stream_set_property;
129   gobject_class->finalize = gst_rtsp_stream_finalize;
130
131   g_object_class_install_property (gobject_class, PROP_CONTROL,
132       g_param_spec_string ("control", "Control",
133           "The control string for this stream", DEFAULT_CONTROL,
134           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
135
136   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
137
138   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
139 }
140
141 static void
142 gst_rtsp_stream_init (GstRTSPStream * stream)
143 {
144   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
145
146   GST_DEBUG ("new stream %p", stream);
147
148   stream->priv = priv;
149
150   priv->dscp_qos = -1;
151   priv->control = g_strdup (DEFAULT_CONTROL);
152
153   g_mutex_init (&priv->lock);
154 }
155
156 static void
157 gst_rtsp_stream_finalize (GObject * obj)
158 {
159   GstRTSPStream *stream;
160   GstRTSPStreamPrivate *priv;
161
162   stream = GST_RTSP_STREAM (obj);
163   priv = stream->priv;
164
165   GST_DEBUG ("finalize stream %p", stream);
166
167   /* we really need to be unjoined now */
168   g_return_if_fail (!priv->is_joined);
169
170   if (priv->addr_v4)
171     gst_rtsp_address_free (priv->addr_v4);
172   if (priv->addr_v6)
173     gst_rtsp_address_free (priv->addr_v6);
174   if (priv->server_addr_v4)
175     gst_rtsp_address_free (priv->server_addr_v4);
176   if (priv->server_addr_v6)
177     gst_rtsp_address_free (priv->server_addr_v6);
178   if (priv->pool)
179     g_object_unref (priv->pool);
180   gst_object_unref (priv->payloader);
181   gst_object_unref (priv->srcpad);
182   g_free (priv->control);
183   g_mutex_clear (&priv->lock);
184
185   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
186 }
187
188 static void
189 gst_rtsp_stream_get_property (GObject * object, guint propid,
190     GValue * value, GParamSpec * pspec)
191 {
192   GstRTSPStream *stream = GST_RTSP_STREAM (object);
193
194   switch (propid) {
195     case PROP_CONTROL:
196       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
197       break;
198     default:
199       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
200   }
201 }
202
203 static void
204 gst_rtsp_stream_set_property (GObject * object, guint propid,
205     const GValue * value, GParamSpec * pspec)
206 {
207   GstRTSPStream *stream = GST_RTSP_STREAM (object);
208
209   switch (propid) {
210     case PROP_CONTROL:
211       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
212       break;
213     default:
214       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
215   }
216 }
217
218 /**
219  * gst_rtsp_stream_new:
220  * @idx: an index
221  * @srcpad: a #GstPad
222  * @payloader: a #GstElement
223  *
224  * Create a new media stream with index @idx that handles RTP data on
225  * @srcpad and has a payloader element @payloader.
226  *
227  * Returns: a new #GstRTSPStream
228  */
229 GstRTSPStream *
230 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
231 {
232   GstRTSPStreamPrivate *priv;
233   GstRTSPStream *stream;
234
235   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
236   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
237   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
238
239   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
240   priv = stream->priv;
241   priv->idx = idx;
242   priv->payloader = gst_object_ref (payloader);
243   priv->srcpad = gst_object_ref (srcpad);
244
245   return stream;
246 }
247
248 /**
249  * gst_rtsp_stream_get_index:
250  * @stream: a #GstRTSPStream
251  *
252  * Get the stream index.
253  *
254  * Return: the stream index.
255  */
256 guint
257 gst_rtsp_stream_get_index (GstRTSPStream * stream)
258 {
259   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
260
261   return stream->priv->idx;
262 }
263
264 /**
265  * gst_rtsp_stream_get_srcpad:
266  * @stream: a #GstRTSPStream
267  *
268  * Get the srcpad associated with @stream.
269  *
270  * Return: the srcpad. Unref after usage.
271  */
272 GstPad *
273 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
274 {
275   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
276
277   return gst_object_ref (stream->priv->srcpad);
278 }
279
280 /**
281  * gst_rtsp_stream_get_control:
282  * @stream: a #GstRTSPStream
283  *
284  * Get the control string to identify this stream.
285  *
286  * Return: the control string. free after usage.
287  */
288 gchar *
289 gst_rtsp_stream_get_control (GstRTSPStream * stream)
290 {
291   GstRTSPStreamPrivate *priv;
292   gchar *result;
293
294   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
295
296   priv = stream->priv;
297
298   g_mutex_lock (&priv->lock);
299   if ((result = g_strdup (priv->control)) == NULL)
300     result = g_strdup_printf ("stream=%d", priv->idx);
301   g_mutex_unlock (&priv->lock);
302
303   return result;
304 }
305
306 /**
307  * gst_rtsp_stream_set_control:
308  * @stream: a #GstRTSPStream
309  * @control: a control string
310  *
311  * Set the control string in @stream.
312  */
313 void
314 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
315 {
316   GstRTSPStreamPrivate *priv;
317
318   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
319
320   priv = stream->priv;
321
322   g_mutex_lock (&priv->lock);
323   g_free (priv->control);
324   priv->control = g_strdup (control);
325   g_mutex_unlock (&priv->lock);
326 }
327
328 /**
329  * gst_rtsp_stream_set_mtu:
330  * @stream: a #GstRTSPStream
331  * @mtu: a new MTU
332  *
333  * Configure the mtu in the payloader of @stream to @mtu.
334  */
335 void
336 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
337 {
338   GstRTSPStreamPrivate *priv;
339
340   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
341
342   priv = stream->priv;
343
344   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
345
346   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
347 }
348
349 /**
350  * gst_rtsp_stream_get_mtu:
351  * @stream: a #GstRTSPStream
352  *
353  * Get the configured MTU in the payloader of @stream.
354  *
355  * Returns: the MTU of the payloader.
356  */
357 guint
358 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
359 {
360   GstRTSPStreamPrivate *priv;
361   guint mtu;
362
363   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
364
365   priv = stream->priv;
366
367   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
368
369   return mtu;
370 }
371
372 /* Update the dscp qos property on the udp sinks */
373 static void
374 update_dscp_qos (GstRTSPStream * stream)
375 {
376   GstRTSPStreamPrivate *priv;
377
378   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
379
380   priv = stream->priv;
381
382   if (priv->udpsink[0]) {
383     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
384         NULL);
385   }
386
387   if (priv->udpsink[1]) {
388     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
389         NULL);
390   }
391 }
392
393 /**
394  * gst_rtsp_stream_set_dscp_qos:
395  * @stream: a #GstRTSPStream
396  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
397  *
398  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
399  */
400 void
401 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
402 {
403   GstRTSPStreamPrivate *priv;
404
405   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
406
407   priv = stream->priv;
408
409   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
410
411   if (dscp_qos < -1 || dscp_qos > 63) {
412     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
413     return;
414   }
415
416   priv->dscp_qos = dscp_qos;
417
418   update_dscp_qos (stream);
419 }
420
421 /**
422  * gst_rtsp_stream_get_dscp_qos:
423  * @stream: a #GstRTSPStream
424  *
425  * Get the configured DSCP QoS in of the outgoing sockets.
426  *
427  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
428  */
429 gint
430 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
431 {
432   GstRTSPStreamPrivate *priv;
433
434   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
435
436   priv = stream->priv;
437
438   return priv->dscp_qos;
439 }
440
441
442 /**
443  * gst_rtsp_stream_set_address_pool:
444  * @stream: a #GstRTSPStream
445  * @pool: a #GstRTSPAddressPool
446  *
447  * configure @pool to be used as the address pool of @stream.
448  */
449 void
450 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
451     GstRTSPAddressPool * pool)
452 {
453   GstRTSPStreamPrivate *priv;
454   GstRTSPAddressPool *old;
455
456   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
457
458   priv = stream->priv;
459
460   GST_LOG_OBJECT (stream, "set address pool %p", pool);
461
462   g_mutex_lock (&priv->lock);
463   if ((old = priv->pool) != pool)
464     priv->pool = pool ? g_object_ref (pool) : NULL;
465   else
466     old = NULL;
467   g_mutex_unlock (&priv->lock);
468
469   if (old)
470     g_object_unref (old);
471 }
472
473 /**
474  * gst_rtsp_stream_get_address_pool:
475  * @stream: a #GstRTSPStream
476  *
477  * Get the #GstRTSPAddressPool used as the address pool of @stream.
478  *
479  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
480  * usage.
481  */
482 GstRTSPAddressPool *
483 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
484 {
485   GstRTSPStreamPrivate *priv;
486   GstRTSPAddressPool *result;
487
488   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
489
490   priv = stream->priv;
491
492   g_mutex_lock (&priv->lock);
493   if ((result = priv->pool))
494     g_object_ref (result);
495   g_mutex_unlock (&priv->lock);
496
497   return result;
498 }
499
500 /**
501  * gst_rtsp_stream_get_multicast_address:
502  * @stream: a #GstRTSPStream
503  * @family: the #GSocketFamily
504  *
505  * Get the multicast address of @stream for @family.
506  *
507  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
508  * allocated. gst_rtsp_address_free() after usage.
509  */
510 GstRTSPAddress *
511 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
512     GSocketFamily family)
513 {
514   GstRTSPStreamPrivate *priv;
515   GstRTSPAddress *result;
516   GstRTSPAddress **addrp;
517   GstRTSPAddressFlags flags;
518
519   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
520
521   priv = stream->priv;
522
523   if (family == G_SOCKET_FAMILY_IPV6) {
524     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
525     addrp = &priv->addr_v4;
526   } else {
527     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
528     addrp = &priv->addr_v6;
529   }
530
531   g_mutex_lock (&priv->lock);
532   if (*addrp == NULL) {
533     if (priv->pool == NULL)
534       goto no_pool;
535
536     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
537
538     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
539     if (*addrp == NULL)
540       goto no_address;
541   }
542   result = gst_rtsp_address_copy (*addrp);
543   g_mutex_unlock (&priv->lock);
544
545   return result;
546
547   /* ERRORS */
548 no_pool:
549   {
550     GST_ERROR_OBJECT (stream, "no address pool specified");
551     g_mutex_unlock (&priv->lock);
552     return NULL;
553   }
554 no_address:
555   {
556     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
557     g_mutex_unlock (&priv->lock);
558     return NULL;
559   }
560 }
561
562 /**
563  * gst_rtsp_stream_reserve_address:
564  * @stream: a #GstRTSPStream
565  * @address: an address
566  * @port: a port
567  * @n_ports: n_ports
568  * @ttl: a TTL
569  *
570  * Reserve @address and @port as the address and port of @stream.
571  *
572  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
573  * reserved. gst_rtsp_address_free() after usage.
574  */
575 GstRTSPAddress *
576 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
577     const gchar * address, guint port, guint n_ports, guint ttl)
578 {
579   GstRTSPStreamPrivate *priv;
580   GstRTSPAddress *result;
581   GInetAddress *addr;
582   GSocketFamily family;
583   GstRTSPAddress **addrp;
584
585   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
586   g_return_val_if_fail (address != NULL, NULL);
587   g_return_val_if_fail (port > 0, NULL);
588   g_return_val_if_fail (n_ports > 0, NULL);
589   g_return_val_if_fail (ttl > 0, NULL);
590
591   priv = stream->priv;
592
593   addr = g_inet_address_new_from_string (address);
594   if (!addr) {
595     GST_ERROR ("failed to get inet addr from %s", address);
596     family = G_SOCKET_FAMILY_IPV4;
597   } else {
598     family = g_inet_address_get_family (addr);
599     g_object_unref (addr);
600   }
601
602   if (family == G_SOCKET_FAMILY_IPV6)
603     addrp = &priv->addr_v4;
604   else
605     addrp = &priv->addr_v6;
606
607   g_mutex_lock (&priv->lock);
608   if (*addrp == NULL) {
609     if (priv->pool == NULL)
610       goto no_pool;
611
612     *addrp = gst_rtsp_address_pool_reserve_address (priv->pool, address,
613         port, n_ports, ttl);
614     if (*addrp == NULL)
615       goto no_address;
616   } else {
617     if (strcmp ((*addrp)->address, address) ||
618         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
619         (*addrp)->ttl != ttl)
620       goto different_address;
621   }
622   result = gst_rtsp_address_copy (*addrp);
623   g_mutex_unlock (&priv->lock);
624
625   return result;
626
627   /* ERRORS */
628 no_pool:
629   {
630     GST_ERROR_OBJECT (stream, "no address pool specified");
631     g_mutex_unlock (&priv->lock);
632     return NULL;
633   }
634 no_address:
635   {
636     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
637         address);
638     g_mutex_unlock (&priv->lock);
639     return NULL;
640   }
641 different_address:
642   {
643     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
644         " reserved", address);
645     g_mutex_unlock (&priv->lock);
646     return NULL;
647   }
648 }
649
650 static gboolean
651 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
652     GSocketFamily family, GstElement * udpsrc_out[2],
653     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
654     GstRTSPAddress ** server_addr_out)
655 {
656   GstStateChangeReturn ret;
657   GstElement *udpsrc0, *udpsrc1;
658   GstElement *udpsink0, *udpsink1;
659   GSocket *rtp_socket = NULL;
660   GSocket *rtcp_socket;
661   gint tmp_rtp, tmp_rtcp;
662   guint count;
663   gint rtpport, rtcpport;
664   GList *rejected_addresses = NULL;
665   GstRTSPAddress *addr = NULL;
666   GInetAddress *inetaddr = NULL;
667   GSocketAddress *rtp_sockaddr = NULL;
668   GSocketAddress *rtcp_sockaddr = NULL;
669   const gchar *multisink_socket;
670
671   if (family == G_SOCKET_FAMILY_IPV6)
672     multisink_socket = "socket-v6";
673   else
674     multisink_socket = "socket";
675
676   udpsrc0 = NULL;
677   udpsrc1 = NULL;
678   udpsink0 = NULL;
679   udpsink1 = NULL;
680   count = 0;
681
682   /* Start with random port */
683   tmp_rtp = 0;
684
685   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
686       G_SOCKET_PROTOCOL_UDP, NULL);
687   if (!rtcp_socket)
688     goto no_udp_protocol;
689
690   if (*server_addr_out)
691     gst_rtsp_address_free (*server_addr_out);
692
693   /* try to allocate 2 UDP ports, the RTP port should be an even
694    * number and the RTCP port should be the next (uneven) port */
695 again:
696
697   if (rtp_socket == NULL) {
698     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
699         G_SOCKET_PROTOCOL_UDP, NULL);
700     if (!rtp_socket)
701       goto no_udp_protocol;
702   }
703
704   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
705     GstRTSPAddressFlags flags;
706
707     if (addr)
708       rejected_addresses = g_list_prepend (rejected_addresses, addr);
709
710     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
711     if (family == G_SOCKET_FAMILY_IPV6)
712       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
713     else
714       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
715
716     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
717
718     if (addr == NULL)
719       goto no_ports;
720
721     tmp_rtp = addr->port;
722
723     g_clear_object (&inetaddr);
724     inetaddr = g_inet_address_new_from_string (addr->address);
725   } else {
726     if (tmp_rtp != 0) {
727       tmp_rtp += 2;
728       if (++count > 20)
729         goto no_ports;
730     }
731
732     if (inetaddr == NULL)
733       inetaddr = g_inet_address_new_any (family);
734   }
735
736   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
737   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
738     g_object_unref (rtp_sockaddr);
739     goto again;
740   }
741   g_object_unref (rtp_sockaddr);
742
743   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
744   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
745     g_clear_object (&rtp_sockaddr);
746     goto socket_error;
747   }
748
749   tmp_rtp =
750       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
751   g_object_unref (rtp_sockaddr);
752
753   /* check if port is even */
754   if ((tmp_rtp & 1) != 0) {
755     /* port not even, close and allocate another */
756     tmp_rtp++;
757     g_clear_object (&rtp_socket);
758     goto again;
759   }
760
761   /* set port */
762   tmp_rtcp = tmp_rtp + 1;
763
764   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
765   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
766     g_object_unref (rtcp_sockaddr);
767     g_clear_object (&rtp_socket);
768     goto again;
769   }
770   g_object_unref (rtcp_sockaddr);
771
772   g_clear_object (&inetaddr);
773
774   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
775   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
776
777   if (udpsrc0 == NULL || udpsrc1 == NULL)
778     goto no_udp_protocol;
779
780   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
781   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
782
783   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
784   if (ret == GST_STATE_CHANGE_FAILURE)
785     goto element_error;
786   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
787   if (ret == GST_STATE_CHANGE_FAILURE)
788     goto element_error;
789
790   /* all fine, do port check */
791   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
792   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
793
794   /* this should not happen... */
795   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
796     goto port_error;
797
798   if (udpsink_out[0])
799     udpsink0 = udpsink_out[0];
800   else
801     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
802
803   if (!udpsink0)
804     goto no_udp_protocol;
805
806   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
807   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
808
809   if (udpsink_out[1])
810     udpsink1 = udpsink_out[1];
811   else
812     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
813
814   if (!udpsink1)
815     goto no_udp_protocol;
816
817   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
818   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
819   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
820
821   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
822   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
823   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
824   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
825   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
826   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
827   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
828   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
829
830   /* we keep these elements, we will further configure them when the
831    * client told us to really use the UDP ports. */
832   udpsrc_out[0] = udpsrc0;
833   udpsrc_out[1] = udpsrc1;
834   udpsink_out[0] = udpsink0;
835   udpsink_out[1] = udpsink1;
836   server_port_out->min = rtpport;
837   server_port_out->max = rtcpport;
838
839   *server_addr_out = addr;
840   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
841
842   g_object_unref (rtp_socket);
843   g_object_unref (rtcp_socket);
844
845   return TRUE;
846
847   /* ERRORS */
848 no_udp_protocol:
849   {
850     goto cleanup;
851   }
852 no_ports:
853   {
854     goto cleanup;
855   }
856 port_error:
857   {
858     goto cleanup;
859   }
860 socket_error:
861   {
862     goto cleanup;
863   }
864 element_error:
865   {
866     goto cleanup;
867   }
868 cleanup:
869   {
870     if (udpsrc0) {
871       gst_element_set_state (udpsrc0, GST_STATE_NULL);
872       gst_object_unref (udpsrc0);
873     }
874     if (udpsrc1) {
875       gst_element_set_state (udpsrc1, GST_STATE_NULL);
876       gst_object_unref (udpsrc1);
877     }
878     if (udpsink0) {
879       gst_element_set_state (udpsink0, GST_STATE_NULL);
880       gst_object_unref (udpsink0);
881     }
882     if (udpsink1) {
883       gst_element_set_state (udpsink1, GST_STATE_NULL);
884       gst_object_unref (udpsink1);
885     }
886     if (inetaddr)
887       g_object_unref (inetaddr);
888     g_list_free_full (rejected_addresses,
889         (GDestroyNotify) gst_rtsp_address_free);
890     if (addr)
891       gst_rtsp_address_free (addr);
892     if (rtp_socket)
893       g_object_unref (rtp_socket);
894     if (rtcp_socket)
895       g_object_unref (rtcp_socket);
896     return FALSE;
897   }
898 }
899
900 /* must be called with lock */
901 static gboolean
902 alloc_ports (GstRTSPStream * stream)
903 {
904   GstRTSPStreamPrivate *priv = stream->priv;
905
906   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
907       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
908       &priv->server_port_v4, &priv->server_addr_v4);
909
910   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
911       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
912       &priv->server_port_v6, &priv->server_addr_v6);
913
914   return priv->have_ipv4 || priv->have_ipv6;
915 }
916
917 /**
918  * gst_rtsp_stream_get_server_port:
919  * @stream: a #GstRTSPStream
920  * @server_port: (out): result server port
921  *
922  * Fill @server_port with the port pair used by the server. This function can
923  * only be called when @stream has been joined.
924  */
925 void
926 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
927     GstRTSPRange * server_port, GSocketFamily family)
928 {
929   GstRTSPStreamPrivate *priv;
930
931   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
932   priv = stream->priv;
933   g_return_if_fail (priv->is_joined);
934
935   g_mutex_lock (&priv->lock);
936   if (family == G_SOCKET_FAMILY_IPV4) {
937     if (server_port)
938       *server_port = priv->server_port_v4;
939   } else {
940     if (server_port)
941       *server_port = priv->server_port_v6;
942   }
943   g_mutex_unlock (&priv->lock);
944 }
945
946 /**
947  * gst_rtsp_stream_get_rtpsession:
948  * @stream: a #GstRTSPStream
949  *
950  * Get the RTP session of this stream.
951  *
952  * Returns: The RTP session of this stream. Unref after usage.
953  */
954 GObject *
955 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
956 {
957   GstRTSPStreamPrivate *priv;
958   GObject *session;
959
960   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
961
962   priv = stream->priv;
963
964   g_mutex_lock (&priv->lock);
965   if ((session = priv->session))
966     g_object_ref (session);
967   g_mutex_unlock (&priv->lock);
968
969   return session;
970 }
971
972 /**
973  * gst_rtsp_stream_get_ssrc:
974  * @stream: a #GstRTSPStream
975  * @ssrc: (out): result ssrc
976  *
977  * Get the SSRC used by the RTP session of this stream. This function can only
978  * be called when @stream has been joined.
979  */
980 void
981 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
982 {
983   GstRTSPStreamPrivate *priv;
984
985   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
986   priv = stream->priv;
987   g_return_if_fail (priv->is_joined);
988
989   g_mutex_lock (&priv->lock);
990   if (ssrc && priv->session)
991     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
992   g_mutex_unlock (&priv->lock);
993 }
994
995 /* executed from streaming thread */
996 static void
997 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
998 {
999   GstRTSPStreamPrivate *priv = stream->priv;
1000   GstCaps *newcaps, *oldcaps;
1001
1002   newcaps = gst_pad_get_current_caps (pad);
1003
1004   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1005       newcaps);
1006
1007   g_mutex_lock (&priv->lock);
1008   oldcaps = priv->caps;
1009   priv->caps = newcaps;
1010   g_mutex_unlock (&priv->lock);
1011
1012   if (oldcaps)
1013     gst_caps_unref (oldcaps);
1014 }
1015
1016 static void
1017 dump_structure (const GstStructure * s)
1018 {
1019   gchar *sstr;
1020
1021   sstr = gst_structure_to_string (s);
1022   GST_INFO ("structure: %s", sstr);
1023   g_free (sstr);
1024 }
1025
1026 static GstRTSPStreamTransport *
1027 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1028 {
1029   GstRTSPStreamPrivate *priv = stream->priv;
1030   GList *walk;
1031   GstRTSPStreamTransport *result = NULL;
1032   const gchar *tmp;
1033   gchar *dest;
1034   guint port;
1035
1036   if (rtcp_from == NULL)
1037     return NULL;
1038
1039   tmp = g_strrstr (rtcp_from, ":");
1040   if (tmp == NULL)
1041     return NULL;
1042
1043   port = atoi (tmp + 1);
1044   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1045
1046   g_mutex_lock (&priv->lock);
1047   GST_INFO ("finding %s:%d in %d transports", dest, port,
1048       g_list_length (priv->transports));
1049
1050   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1051     GstRTSPStreamTransport *trans = walk->data;
1052     const GstRTSPTransport *tr;
1053     gint min, max;
1054
1055     tr = gst_rtsp_stream_transport_get_transport (trans);
1056
1057     min = tr->client_port.min;
1058     max = tr->client_port.max;
1059
1060     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1061       result = trans;
1062       break;
1063     }
1064   }
1065   if (result)
1066     g_object_ref (result);
1067   g_mutex_unlock (&priv->lock);
1068
1069   g_free (dest);
1070
1071   return result;
1072 }
1073
1074 static GstRTSPStreamTransport *
1075 check_transport (GObject * source, GstRTSPStream * stream)
1076 {
1077   GstStructure *stats;
1078   GstRTSPStreamTransport *trans;
1079
1080   /* see if we have a stream to match with the origin of the RTCP packet */
1081   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1082   if (trans == NULL) {
1083     g_object_get (source, "stats", &stats, NULL);
1084     if (stats) {
1085       const gchar *rtcp_from;
1086
1087       dump_structure (stats);
1088
1089       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1090       if ((trans = find_transport (stream, rtcp_from))) {
1091         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1092             source);
1093         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1094             g_object_unref);
1095       }
1096       gst_structure_free (stats);
1097     }
1098   }
1099   return trans;
1100 }
1101
1102
1103 static void
1104 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1105 {
1106   GstRTSPStreamTransport *trans;
1107
1108   GST_INFO ("%p: new source %p", stream, source);
1109
1110   trans = check_transport (source, stream);
1111
1112   if (trans)
1113     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1114 }
1115
1116 static void
1117 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1118 {
1119   GST_INFO ("%p: new SDES %p", stream, source);
1120 }
1121
1122 static void
1123 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1124 {
1125   GstRTSPStreamTransport *trans;
1126
1127   trans = check_transport (source, stream);
1128
1129   if (trans) {
1130     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1131     gst_rtsp_stream_transport_keep_alive (trans);
1132   }
1133 #ifdef DUMP_STATS
1134   {
1135     GstStructure *stats;
1136     g_object_get (source, "stats", &stats, NULL);
1137     if (stats) {
1138       dump_structure (stats);
1139       gst_structure_free (stats);
1140     }
1141   }
1142 #endif
1143 }
1144
1145 static void
1146 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1147 {
1148   GST_INFO ("%p: source %p bye", stream, source);
1149 }
1150
1151 static void
1152 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1153 {
1154   GstRTSPStreamTransport *trans;
1155
1156   GST_INFO ("%p: source %p bye timeout", stream, source);
1157
1158   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1159     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1160     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1161   }
1162 }
1163
1164 static void
1165 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1166 {
1167   GstRTSPStreamTransport *trans;
1168
1169   GST_INFO ("%p: source %p timeout", stream, source);
1170
1171   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1172     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1173     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1174   }
1175 }
1176
1177 static GstFlowReturn
1178 handle_new_sample (GstAppSink * sink, gpointer user_data)
1179 {
1180   GstRTSPStreamPrivate *priv;
1181   GList *walk;
1182   GstSample *sample;
1183   GstBuffer *buffer;
1184   GstRTSPStream *stream;
1185
1186   sample = gst_app_sink_pull_sample (sink);
1187   if (!sample)
1188     return GST_FLOW_OK;
1189
1190   stream = (GstRTSPStream *) user_data;
1191   priv = stream->priv;
1192   buffer = gst_sample_get_buffer (sample);
1193
1194   g_mutex_lock (&priv->lock);
1195   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1196     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1197
1198     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1199       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1200     } else {
1201       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1202     }
1203   }
1204   g_mutex_unlock (&priv->lock);
1205
1206   gst_sample_unref (sample);
1207
1208   return GST_FLOW_OK;
1209 }
1210
1211 static GstAppSinkCallbacks sink_cb = {
1212   NULL,                         /* not interested in EOS */
1213   NULL,                         /* not interested in preroll samples */
1214   handle_new_sample,
1215 };
1216
1217 /**
1218  * gst_rtsp_stream_join_bin:
1219  * @stream: a #GstRTSPStream
1220  * @bin: a #GstBin to join
1221  * @rtpbin: a rtpbin element in @bin
1222  * @state: the target state of the new elements
1223  *
1224  * Join the #Gstbin @bin that contains the element @rtpbin.
1225  *
1226  * @stream will link to @rtpbin, which must be inside @bin. The elements
1227  * added to @bin will be set to the state given in @state.
1228  *
1229  * Returns: %TRUE on success.
1230  */
1231 gboolean
1232 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1233     GstElement * rtpbin, GstState state)
1234 {
1235   GstRTSPStreamPrivate *priv;
1236   gint i, idx;
1237   gchar *name;
1238   GstPad *pad, *teepad, *queuepad, *selpad;
1239   GstPadLinkReturn ret;
1240
1241   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1242   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1243   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1244
1245   priv = stream->priv;
1246
1247   g_mutex_lock (&priv->lock);
1248   if (priv->is_joined)
1249     goto was_joined;
1250
1251   /* create a session with the same index as the stream */
1252   idx = priv->idx;
1253
1254   GST_INFO ("stream %p joining bin as session %d", stream, idx);
1255
1256   if (!alloc_ports (stream))
1257     goto no_ports;
1258
1259   /* update the dscp qos field in the sinks */
1260   update_dscp_qos (stream);
1261
1262   /* get a pad for sending RTP */
1263   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1264   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1265   g_free (name);
1266   /* link the RTP pad to the session manager, it should not really fail unless
1267    * this is not really an RTP pad */
1268   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1269   if (ret != GST_PAD_LINK_OK)
1270     goto link_failed;
1271
1272   /* get pads from the RTP session element for sending and receiving
1273    * RTP/RTCP*/
1274   name = g_strdup_printf ("send_rtp_src_%u", idx);
1275   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1276   g_free (name);
1277   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1278   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1279   g_free (name);
1280   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1281   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1282   g_free (name);
1283   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1284   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1285   g_free (name);
1286
1287   /* get the session */
1288   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1289
1290   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1291       stream);
1292   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1293       stream);
1294   g_signal_connect (priv->session, "on-ssrc-active",
1295       (GCallback) on_ssrc_active, stream);
1296   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1297       stream);
1298   g_signal_connect (priv->session, "on-bye-timeout",
1299       (GCallback) on_bye_timeout, stream);
1300   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1301       stream);
1302
1303   for (i = 0; i < 2; i++) {
1304     /* For the sender we create this bit of pipeline for both
1305      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1306      * we need to add a queue before appsink to make the pipeline
1307      * not block. For the TCP case, we want to pump data to the
1308      * client as fast as possible anyway.
1309      *
1310      * .--------.      .-----.    .---------.
1311      * | rtpbin |      | tee |    | udpsink |
1312      * |       send->sink   src->sink       |
1313      * '--------'      |     |    '---------'
1314      *                 |     |    .---------.    .---------.
1315      *                 |     |    |  queue  |    | appsink |
1316      *                 |    src->sink      src->sink       |
1317      *                 '-----'    '---------'    '---------'
1318      */
1319     /* make tee for RTP/RTCP */
1320     priv->tee[i] = gst_element_factory_make ("tee", NULL);
1321     gst_bin_add (bin, priv->tee[i]);
1322
1323     /* and link to rtpbin send pad */
1324     pad = gst_element_get_static_pad (priv->tee[i], "sink");
1325     gst_pad_link (priv->send_src[i], pad);
1326     gst_object_unref (pad);
1327
1328     /* add udpsink */
1329     gst_bin_add (bin, priv->udpsink[i]);
1330
1331     /* link tee to udpsink */
1332     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1333     pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1334     gst_pad_link (teepad, pad);
1335     gst_object_unref (pad);
1336     gst_object_unref (teepad);
1337
1338     /* make queue */
1339     priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1340     gst_bin_add (bin, priv->appqueue[i]);
1341     /* and link to tee */
1342     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1343     pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1344     gst_pad_link (teepad, pad);
1345     gst_object_unref (pad);
1346     gst_object_unref (teepad);
1347
1348     /* make appsink */
1349     priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1350     g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1351     g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1352     gst_bin_add (bin, priv->appsink[i]);
1353     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1354         &sink_cb, stream, NULL);
1355     /* and link to queue */
1356     queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1357     pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1358     gst_pad_link (queuepad, pad);
1359     gst_object_unref (pad);
1360     gst_object_unref (queuepad);
1361
1362     /* For the receiver we create this bit of pipeline for both
1363      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1364      * and it is all funneled into the rtpbin receive pad.
1365      *
1366      * .--------.     .--------.    .--------.
1367      * | udpsrc |     | funnel |    | rtpbin |
1368      * |       src->sink      src->sink      |
1369      * '--------'     |        |    '--------'
1370      * .--------.     |        |
1371      * | appsrc |     |        |
1372      * |       src->sink       |
1373      * '--------'     '--------'
1374      */
1375     /* make funnel for the RTP/RTCP receivers */
1376     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1377     gst_bin_add (bin, priv->funnel[i]);
1378
1379     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1380     gst_pad_link (pad, priv->recv_sink[i]);
1381     gst_object_unref (pad);
1382
1383     if (priv->udpsrc_v4[i]) {
1384       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1385        * values */
1386       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1387       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1388       /* add udpsrc */
1389       gst_bin_add (bin, priv->udpsrc_v4[i]);
1390
1391       /* and link to the funnel v4 */
1392       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1393       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1394       gst_pad_link (pad, selpad);
1395       gst_object_unref (pad);
1396       gst_object_unref (selpad);
1397     }
1398
1399     if (priv->udpsrc_v6[i]) {
1400       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1401       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1402       gst_bin_add (bin, priv->udpsrc_v6[i]);
1403
1404       /* and link to the funnel v6 */
1405       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1406       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1407       gst_pad_link (pad, selpad);
1408       gst_object_unref (pad);
1409       gst_object_unref (selpad);
1410     }
1411
1412     /* make and add appsrc */
1413     priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1414     gst_bin_add (bin, priv->appsrc[i]);
1415     /* and link to the funnel */
1416     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1417     pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1418     gst_pad_link (pad, selpad);
1419     gst_object_unref (pad);
1420     gst_object_unref (selpad);
1421
1422     /* check if we need to set to a special state */
1423     if (state != GST_STATE_NULL) {
1424       gst_element_set_state (priv->udpsink[i], state);
1425       gst_element_set_state (priv->appsink[i], state);
1426       gst_element_set_state (priv->appqueue[i], state);
1427       gst_element_set_state (priv->tee[i], state);
1428       gst_element_set_state (priv->funnel[i], state);
1429       gst_element_set_state (priv->appsrc[i], state);
1430     }
1431   }
1432
1433   /* be notified of caps changes */
1434   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1435       (GCallback) caps_notify, stream);
1436
1437   priv->is_joined = TRUE;
1438   g_mutex_unlock (&priv->lock);
1439
1440   return TRUE;
1441
1442   /* ERRORS */
1443 was_joined:
1444   {
1445     g_mutex_unlock (&priv->lock);
1446     return TRUE;
1447   }
1448 no_ports:
1449   {
1450     g_mutex_unlock (&priv->lock);
1451     GST_WARNING ("failed to allocate ports %d", idx);
1452     return FALSE;
1453   }
1454 link_failed:
1455   {
1456     GST_WARNING ("failed to link stream %d", idx);
1457     gst_object_unref (priv->send_rtp_sink);
1458     priv->send_rtp_sink = NULL;
1459     g_mutex_unlock (&priv->lock);
1460     return FALSE;
1461   }
1462 }
1463
1464 /**
1465  * gst_rtsp_stream_leave_bin:
1466  * @stream: a #GstRTSPStream
1467  * @bin: a #GstBin
1468  * @rtpbin: a rtpbin #GstElement
1469  *
1470  * Remove the elements of @stream from @bin.
1471  *
1472  * Return: %TRUE on success.
1473  */
1474 gboolean
1475 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1476     GstElement * rtpbin)
1477 {
1478   GstRTSPStreamPrivate *priv;
1479   gint i;
1480
1481   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1482   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1483   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1484
1485   priv = stream->priv;
1486
1487   g_mutex_lock (&priv->lock);
1488   if (!priv->is_joined)
1489     goto was_not_joined;
1490
1491   /* all transports must be removed by now */
1492   g_return_val_if_fail (priv->transports == NULL, FALSE);
1493
1494   GST_INFO ("stream %p leaving bin", stream);
1495
1496   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1497   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1498   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1499   gst_object_unref (priv->send_rtp_sink);
1500   priv->send_rtp_sink = NULL;
1501
1502   for (i = 0; i < 2; i++) {
1503     gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1504     gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1505     gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1506     gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1507     gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1508     gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1509     if (priv->udpsrc_v4[i]) {
1510       /* and set udpsrc to NULL now before removing */
1511       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1512       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1513       /* removing them should also nicely release the request
1514        * pads when they finalize */
1515       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1516     }
1517     if (priv->udpsrc_v6[i]) {
1518       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1519       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1520       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1521     }
1522     gst_bin_remove (bin, priv->udpsink[i]);
1523     gst_bin_remove (bin, priv->appsrc[i]);
1524     gst_bin_remove (bin, priv->appsink[i]);
1525     gst_bin_remove (bin, priv->appqueue[i]);
1526     gst_bin_remove (bin, priv->tee[i]);
1527     gst_bin_remove (bin, priv->funnel[i]);
1528
1529     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1530     gst_object_unref (priv->recv_sink[i]);
1531     priv->recv_sink[i] = NULL;
1532
1533     priv->udpsrc_v4[i] = NULL;
1534     priv->udpsrc_v6[i] = NULL;
1535     priv->udpsink[i] = NULL;
1536     priv->appsrc[i] = NULL;
1537     priv->appsink[i] = NULL;
1538     priv->appqueue[i] = NULL;
1539     priv->tee[i] = NULL;
1540     priv->funnel[i] = NULL;
1541   }
1542   gst_object_unref (priv->send_src[0]);
1543   priv->send_src[0] = NULL;
1544
1545   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1546   gst_object_unref (priv->send_src[1]);
1547   priv->send_src[1] = NULL;
1548
1549   g_object_unref (priv->session);
1550   priv->session = NULL;
1551   if (priv->caps)
1552     gst_caps_unref (priv->caps);
1553   priv->caps = NULL;
1554
1555   priv->is_joined = FALSE;
1556   g_mutex_unlock (&priv->lock);
1557
1558   return TRUE;
1559
1560 was_not_joined:
1561   {
1562     return TRUE;
1563   }
1564 }
1565
1566 /**
1567  * gst_rtsp_stream_get_rtpinfo:
1568  * @stream: a #GstRTSPStream
1569  * @rtptime: result RTP timestamp
1570  * @seq: result RTP seqnum
1571  *
1572  * Retrieve the current rtptime and seq. This is used to
1573  * construct a RTPInfo reply header.
1574  *
1575  * Returns: %TRUE when rtptime and seq could be determined.
1576  */
1577 gboolean
1578 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1579     guint * rtptime, guint * seq)
1580 {
1581   GstRTSPStreamPrivate *priv;
1582   GObjectClass *payobjclass;
1583
1584   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1585   g_return_val_if_fail (rtptime != NULL, FALSE);
1586   g_return_val_if_fail (seq != NULL, FALSE);
1587
1588   priv = stream->priv;
1589
1590   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1591
1592   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1593       !g_object_class_find_property (payobjclass, "timestamp"))
1594     return FALSE;
1595
1596   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1597
1598   return TRUE;
1599 }
1600
1601 /**
1602  * gst_rtsp_stream_get_caps:
1603  * @stream: a #GstRTSPStream
1604  *
1605  * Retrieve the current caps of @stream.
1606  *
1607  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1608  *    after usage.
1609  */
1610 GstCaps *
1611 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1612 {
1613   GstRTSPStreamPrivate *priv;
1614   GstCaps *result;
1615
1616   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1617
1618   priv = stream->priv;
1619
1620   g_mutex_lock (&priv->lock);
1621   if ((result = priv->caps))
1622     gst_caps_ref (result);
1623   g_mutex_unlock (&priv->lock);
1624
1625   return result;
1626 }
1627
1628 /**
1629  * gst_rtsp_stream_recv_rtp:
1630  * @stream: a #GstRTSPStream
1631  * @buffer: (transfer full): a #GstBuffer
1632  *
1633  * Handle an RTP buffer for the stream. This method is usually called when a
1634  * message has been received from a client using the TCP transport.
1635  *
1636  * This function takes ownership of @buffer.
1637  *
1638  * Returns: a GstFlowReturn.
1639  */
1640 GstFlowReturn
1641 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1642 {
1643   GstRTSPStreamPrivate *priv;
1644   GstFlowReturn ret;
1645   GstElement *element;
1646
1647   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1648   priv = stream->priv;
1649   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1650   g_return_val_if_fail (priv->is_joined, FALSE);
1651
1652   g_mutex_lock (&priv->lock);
1653   element = gst_object_ref (priv->appsrc[0]);
1654   g_mutex_unlock (&priv->lock);
1655
1656   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1657
1658   gst_object_unref (element);
1659
1660   return ret;
1661 }
1662
1663 /**
1664  * gst_rtsp_stream_recv_rtcp:
1665  * @stream: a #GstRTSPStream
1666  * @buffer: (transfer full): a #GstBuffer
1667  *
1668  * Handle an RTCP buffer for the stream. This method is usually called when a
1669  * message has been received from a client using the TCP transport.
1670  *
1671  * This function takes ownership of @buffer.
1672  *
1673  * Returns: a GstFlowReturn.
1674  */
1675 GstFlowReturn
1676 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1677 {
1678   GstRTSPStreamPrivate *priv;
1679   GstFlowReturn ret;
1680   GstElement *element;
1681
1682   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1683   priv = stream->priv;
1684   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1685   g_return_val_if_fail (priv->is_joined, FALSE);
1686
1687   g_mutex_lock (&priv->lock);
1688   element = gst_object_ref (priv->appsrc[1]);
1689   g_mutex_unlock (&priv->lock);
1690
1691   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1692
1693   gst_object_unref (element);
1694
1695   return ret;
1696 }
1697
1698 /* must be called with lock */
1699 static gboolean
1700 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1701     gboolean add)
1702 {
1703   GstRTSPStreamPrivate *priv = stream->priv;
1704   const GstRTSPTransport *tr;
1705
1706   tr = gst_rtsp_stream_transport_get_transport (trans);
1707
1708   switch (tr->lower_transport) {
1709     case GST_RTSP_LOWER_TRANS_UDP:
1710     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1711     {
1712       gchar *dest;
1713       gint min, max;
1714       guint ttl = 0;
1715
1716       dest = tr->destination;
1717       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1718         min = tr->port.min;
1719         max = tr->port.max;
1720         ttl = tr->ttl;
1721       } else {
1722         min = tr->client_port.min;
1723         max = tr->client_port.max;
1724       }
1725
1726       if (add) {
1727         GST_INFO ("adding %s:%d-%d", dest, min, max);
1728         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1729         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1730         if (ttl > 0) {
1731           GST_INFO ("setting ttl-mc %d", ttl);
1732           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1733           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1734         }
1735         priv->transports = g_list_prepend (priv->transports, trans);
1736       } else {
1737         GST_INFO ("removing %s:%d-%d", dest, min, max);
1738         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1739         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1740         priv->transports = g_list_remove (priv->transports, trans);
1741       }
1742       break;
1743     }
1744     case GST_RTSP_LOWER_TRANS_TCP:
1745       if (add) {
1746         GST_INFO ("adding TCP %s", tr->destination);
1747         priv->transports = g_list_prepend (priv->transports, trans);
1748       } else {
1749         GST_INFO ("removing TCP %s", tr->destination);
1750         priv->transports = g_list_remove (priv->transports, trans);
1751       }
1752       break;
1753     default:
1754       goto unknown_transport;
1755   }
1756   return TRUE;
1757
1758   /* ERRORS */
1759 unknown_transport:
1760   {
1761     GST_INFO ("Unknown transport %d", tr->lower_transport);
1762     return FALSE;
1763   }
1764 }
1765
1766
1767 /**
1768  * gst_rtsp_stream_add_transport:
1769  * @stream: a #GstRTSPStream
1770  * @trans: a #GstRTSPStreamTransport
1771  *
1772  * Add the transport in @trans to @stream. The media of @stream will
1773  * then also be send to the values configured in @trans.
1774  *
1775  * @stream must be joined to a bin.
1776  *
1777  * @trans must contain a valid #GstRTSPTransport.
1778  *
1779  * Returns: %TRUE if @trans was added
1780  */
1781 gboolean
1782 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1783     GstRTSPStreamTransport * trans)
1784 {
1785   GstRTSPStreamPrivate *priv;
1786   gboolean res;
1787
1788   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1789   priv = stream->priv;
1790   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1791   g_return_val_if_fail (priv->is_joined, FALSE);
1792
1793   g_mutex_lock (&priv->lock);
1794   res = update_transport (stream, trans, TRUE);
1795   g_mutex_unlock (&priv->lock);
1796
1797   return res;
1798 }
1799
1800 /**
1801  * gst_rtsp_stream_remove_transport:
1802  * @stream: a #GstRTSPStream
1803  * @trans: a #GstRTSPStreamTransport
1804  *
1805  * Remove the transport in @trans from @stream. The media of @stream will
1806  * not be sent to the values configured in @trans.
1807  *
1808  * @stream must be joined to a bin.
1809  *
1810  * @trans must contain a valid #GstRTSPTransport.
1811  *
1812  * Returns: %TRUE if @trans was removed
1813  */
1814 gboolean
1815 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1816     GstRTSPStreamTransport * trans)
1817 {
1818   GstRTSPStreamPrivate *priv;
1819   gboolean res;
1820
1821   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1822   priv = stream->priv;
1823   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1824   g_return_val_if_fail (priv->is_joined, FALSE);
1825
1826   g_mutex_lock (&priv->lock);
1827   res = update_transport (stream, trans, FALSE);
1828   g_mutex_unlock (&priv->lock);
1829
1830   return res;
1831 }