docs: improve docs
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   /* pads on the rtpbin */
72   GstPad *send_rtp_sink;
73   GstPad *recv_sink[2];
74   GstPad *send_src[2];
75
76   /* the RTPSession object */
77   GObject *session;
78
79   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
80    * sockets */
81   GstElement *udpsrc_v4[2];
82
83   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
84    * sockets */
85   GstElement *udpsrc_v6[2];
86
87   GstElement *udpsink[2];
88
89   /* for TCP transport */
90   GstElement *appsrc[2];
91   GstElement *appqueue[2];
92   GstElement *appsink[2];
93
94   GstElement *tee[2];
95   GstElement *funnel[2];
96
97   /* server ports for sending/receiving over ipv4 */
98   GstRTSPRange server_port_v4;
99   GstRTSPAddress *server_addr_v4;
100   gboolean have_ipv4;
101
102   /* server ports for sending/receiving over ipv6 */
103   GstRTSPRange server_port_v6;
104   GstRTSPAddress *server_addr_v6;
105   gboolean have_ipv6;
106
107   /* multicast addresses */
108   GstRTSPAddressPool *pool;
109   GstRTSPAddress *addr_v4;
110   GstRTSPAddress *addr_v6;
111
112   /* the caps of the stream */
113   gulong caps_sig;
114   GstCaps *caps;
115
116   /* transports we stream to */
117   guint n_active;
118   GList *transports;
119
120   gint dscp_qos;
121 };
122
123 #define DEFAULT_CONTROL NULL
124
125 enum
126 {
127   PROP_0,
128   PROP_CONTROL,
129   PROP_LAST
130 };
131
132 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
133 #define GST_CAT_DEFAULT rtsp_stream_debug
134
135 static GQuark ssrc_stream_map_key;
136
137 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
138     GValue * value, GParamSpec * pspec);
139 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
140     const GValue * value, GParamSpec * pspec);
141
142 static void gst_rtsp_stream_finalize (GObject * obj);
143
144 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
145
146 static void
147 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
148 {
149   GObjectClass *gobject_class;
150
151   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
152
153   gobject_class = G_OBJECT_CLASS (klass);
154
155   gobject_class->get_property = gst_rtsp_stream_get_property;
156   gobject_class->set_property = gst_rtsp_stream_set_property;
157   gobject_class->finalize = gst_rtsp_stream_finalize;
158
159   g_object_class_install_property (gobject_class, PROP_CONTROL,
160       g_param_spec_string ("control", "Control",
161           "The control string for this stream", DEFAULT_CONTROL,
162           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
163
164   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
165
166   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
167 }
168
169 static void
170 gst_rtsp_stream_init (GstRTSPStream * stream)
171 {
172   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
173
174   GST_DEBUG ("new stream %p", stream);
175
176   stream->priv = priv;
177
178   priv->dscp_qos = -1;
179   priv->control = g_strdup (DEFAULT_CONTROL);
180
181   g_mutex_init (&priv->lock);
182 }
183
184 static void
185 gst_rtsp_stream_finalize (GObject * obj)
186 {
187   GstRTSPStream *stream;
188   GstRTSPStreamPrivate *priv;
189
190   stream = GST_RTSP_STREAM (obj);
191   priv = stream->priv;
192
193   GST_DEBUG ("finalize stream %p", stream);
194
195   /* we really need to be unjoined now */
196   g_return_if_fail (!priv->is_joined);
197
198   if (priv->addr_v4)
199     gst_rtsp_address_free (priv->addr_v4);
200   if (priv->addr_v6)
201     gst_rtsp_address_free (priv->addr_v6);
202   if (priv->server_addr_v4)
203     gst_rtsp_address_free (priv->server_addr_v4);
204   if (priv->server_addr_v6)
205     gst_rtsp_address_free (priv->server_addr_v6);
206   if (priv->pool)
207     g_object_unref (priv->pool);
208   gst_object_unref (priv->payloader);
209   gst_object_unref (priv->srcpad);
210   g_free (priv->control);
211   g_mutex_clear (&priv->lock);
212
213   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
214 }
215
216 static void
217 gst_rtsp_stream_get_property (GObject * object, guint propid,
218     GValue * value, GParamSpec * pspec)
219 {
220   GstRTSPStream *stream = GST_RTSP_STREAM (object);
221
222   switch (propid) {
223     case PROP_CONTROL:
224       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
225       break;
226     default:
227       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
228   }
229 }
230
231 static void
232 gst_rtsp_stream_set_property (GObject * object, guint propid,
233     const GValue * value, GParamSpec * pspec)
234 {
235   GstRTSPStream *stream = GST_RTSP_STREAM (object);
236
237   switch (propid) {
238     case PROP_CONTROL:
239       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
240       break;
241     default:
242       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
243   }
244 }
245
246 /**
247  * gst_rtsp_stream_new:
248  * @idx: an index
249  * @srcpad: a #GstPad
250  * @payloader: a #GstElement
251  *
252  * Create a new media stream with index @idx that handles RTP data on
253  * @srcpad and has a payloader element @payloader.
254  *
255  * Returns: a new #GstRTSPStream
256  */
257 GstRTSPStream *
258 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
259 {
260   GstRTSPStreamPrivate *priv;
261   GstRTSPStream *stream;
262
263   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
264   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
265   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
266
267   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
268   priv = stream->priv;
269   priv->idx = idx;
270   priv->payloader = gst_object_ref (payloader);
271   priv->srcpad = gst_object_ref (srcpad);
272
273   return stream;
274 }
275
276 /**
277  * gst_rtsp_stream_get_index:
278  * @stream: a #GstRTSPStream
279  *
280  * Get the stream index.
281  *
282  * Return: the stream index.
283  */
284 guint
285 gst_rtsp_stream_get_index (GstRTSPStream * stream)
286 {
287   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
288
289   return stream->priv->idx;
290 }
291
292 /**
293  * gst_rtsp_stream_get_srcpad:
294  * @stream: a #GstRTSPStream
295  *
296  * Get the srcpad associated with @stream.
297  *
298  * Return: the srcpad. Unref after usage.
299  */
300 GstPad *
301 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
302 {
303   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
304
305   return gst_object_ref (stream->priv->srcpad);
306 }
307
308 /**
309  * gst_rtsp_stream_get_control:
310  * @stream: a #GstRTSPStream
311  *
312  * Get the control string to identify this stream.
313  *
314  * Return: the control string. free after usage.
315  */
316 gchar *
317 gst_rtsp_stream_get_control (GstRTSPStream * stream)
318 {
319   GstRTSPStreamPrivate *priv;
320   gchar *result;
321
322   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
323
324   priv = stream->priv;
325
326   g_mutex_lock (&priv->lock);
327   if ((result = g_strdup (priv->control)) == NULL)
328     result = g_strdup_printf ("stream=%u", priv->idx);
329   g_mutex_unlock (&priv->lock);
330
331   return result;
332 }
333
334 /**
335  * gst_rtsp_stream_set_control:
336  * @stream: a #GstRTSPStream
337  * @control: a control string
338  *
339  * Set the control string in @stream.
340  */
341 void
342 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
343 {
344   GstRTSPStreamPrivate *priv;
345
346   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
347
348   priv = stream->priv;
349
350   g_mutex_lock (&priv->lock);
351   g_free (priv->control);
352   priv->control = g_strdup (control);
353   g_mutex_unlock (&priv->lock);
354 }
355
356 /**
357  * gst_rtsp_stream_has_control:
358  * @stream: a #GstRTSPStream
359  * @control: a control string
360  *
361  * Check if @stream has the control string @control.
362  *
363  * Returns: %TRUE is @stream has @control as the control string
364  */
365 gboolean
366 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
367 {
368   GstRTSPStreamPrivate *priv;
369   gboolean res;
370
371   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
372
373   priv = stream->priv;
374
375   g_mutex_lock (&priv->lock);
376   if (priv->control)
377     res = g_strcmp0 (priv->control, control);
378   else {
379     guint streamid;
380     sscanf (control, "stream=%u", &streamid);
381     res = (streamid == priv->idx);
382   }
383   g_mutex_unlock (&priv->lock);
384
385   return res;
386 }
387
388 /**
389  * gst_rtsp_stream_set_mtu:
390  * @stream: a #GstRTSPStream
391  * @mtu: a new MTU
392  *
393  * Configure the mtu in the payloader of @stream to @mtu.
394  */
395 void
396 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
397 {
398   GstRTSPStreamPrivate *priv;
399
400   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
401
402   priv = stream->priv;
403
404   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
405
406   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
407 }
408
409 /**
410  * gst_rtsp_stream_get_mtu:
411  * @stream: a #GstRTSPStream
412  *
413  * Get the configured MTU in the payloader of @stream.
414  *
415  * Returns: the MTU of the payloader.
416  */
417 guint
418 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
419 {
420   GstRTSPStreamPrivate *priv;
421   guint mtu;
422
423   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
424
425   priv = stream->priv;
426
427   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
428
429   return mtu;
430 }
431
432 /* Update the dscp qos property on the udp sinks */
433 static void
434 update_dscp_qos (GstRTSPStream * stream)
435 {
436   GstRTSPStreamPrivate *priv;
437
438   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
439
440   priv = stream->priv;
441
442   if (priv->udpsink[0]) {
443     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
444         NULL);
445   }
446
447   if (priv->udpsink[1]) {
448     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
449         NULL);
450   }
451 }
452
453 /**
454  * gst_rtsp_stream_set_dscp_qos:
455  * @stream: a #GstRTSPStream
456  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
457  *
458  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
459  */
460 void
461 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
462 {
463   GstRTSPStreamPrivate *priv;
464
465   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
466
467   priv = stream->priv;
468
469   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
470
471   if (dscp_qos < -1 || dscp_qos > 63) {
472     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
473     return;
474   }
475
476   priv->dscp_qos = dscp_qos;
477
478   update_dscp_qos (stream);
479 }
480
481 /**
482  * gst_rtsp_stream_get_dscp_qos:
483  * @stream: a #GstRTSPStream
484  *
485  * Get the configured DSCP QoS in of the outgoing sockets.
486  *
487  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
488  */
489 gint
490 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
491 {
492   GstRTSPStreamPrivate *priv;
493
494   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
495
496   priv = stream->priv;
497
498   return priv->dscp_qos;
499 }
500
501
502 /**
503  * gst_rtsp_stream_set_address_pool:
504  * @stream: a #GstRTSPStream
505  * @pool: a #GstRTSPAddressPool
506  *
507  * configure @pool to be used as the address pool of @stream.
508  */
509 void
510 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
511     GstRTSPAddressPool * pool)
512 {
513   GstRTSPStreamPrivate *priv;
514   GstRTSPAddressPool *old;
515
516   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
517
518   priv = stream->priv;
519
520   GST_LOG_OBJECT (stream, "set address pool %p", pool);
521
522   g_mutex_lock (&priv->lock);
523   if ((old = priv->pool) != pool)
524     priv->pool = pool ? g_object_ref (pool) : NULL;
525   else
526     old = NULL;
527   g_mutex_unlock (&priv->lock);
528
529   if (old)
530     g_object_unref (old);
531 }
532
533 /**
534  * gst_rtsp_stream_get_address_pool:
535  * @stream: a #GstRTSPStream
536  *
537  * Get the #GstRTSPAddressPool used as the address pool of @stream.
538  *
539  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
540  * usage.
541  */
542 GstRTSPAddressPool *
543 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
544 {
545   GstRTSPStreamPrivate *priv;
546   GstRTSPAddressPool *result;
547
548   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
549
550   priv = stream->priv;
551
552   g_mutex_lock (&priv->lock);
553   if ((result = priv->pool))
554     g_object_ref (result);
555   g_mutex_unlock (&priv->lock);
556
557   return result;
558 }
559
560 /**
561  * gst_rtsp_stream_get_multicast_address:
562  * @stream: a #GstRTSPStream
563  * @family: the #GSocketFamily
564  *
565  * Get the multicast address of @stream for @family.
566  *
567  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
568  * allocated. gst_rtsp_address_free() after usage.
569  */
570 GstRTSPAddress *
571 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
572     GSocketFamily family)
573 {
574   GstRTSPStreamPrivate *priv;
575   GstRTSPAddress *result;
576   GstRTSPAddress **addrp;
577   GstRTSPAddressFlags flags;
578
579   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
580
581   priv = stream->priv;
582
583   if (family == G_SOCKET_FAMILY_IPV6) {
584     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
585     addrp = &priv->addr_v4;
586   } else {
587     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
588     addrp = &priv->addr_v6;
589   }
590
591   g_mutex_lock (&priv->lock);
592   if (*addrp == NULL) {
593     if (priv->pool == NULL)
594       goto no_pool;
595
596     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
597
598     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
599     if (*addrp == NULL)
600       goto no_address;
601   }
602   result = gst_rtsp_address_copy (*addrp);
603   g_mutex_unlock (&priv->lock);
604
605   return result;
606
607   /* ERRORS */
608 no_pool:
609   {
610     GST_ERROR_OBJECT (stream, "no address pool specified");
611     g_mutex_unlock (&priv->lock);
612     return NULL;
613   }
614 no_address:
615   {
616     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
617     g_mutex_unlock (&priv->lock);
618     return NULL;
619   }
620 }
621
622 /**
623  * gst_rtsp_stream_reserve_address:
624  * @stream: a #GstRTSPStream
625  * @address: an address
626  * @port: a port
627  * @n_ports: n_ports
628  * @ttl: a TTL
629  *
630  * Reserve @address and @port as the address and port of @stream.
631  *
632  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
633  * reserved. gst_rtsp_address_free() after usage.
634  */
635 GstRTSPAddress *
636 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
637     const gchar * address, guint port, guint n_ports, guint ttl)
638 {
639   GstRTSPStreamPrivate *priv;
640   GstRTSPAddress *result;
641   GInetAddress *addr;
642   GSocketFamily family;
643   GstRTSPAddress **addrp;
644
645   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
646   g_return_val_if_fail (address != NULL, NULL);
647   g_return_val_if_fail (port > 0, NULL);
648   g_return_val_if_fail (n_ports > 0, NULL);
649   g_return_val_if_fail (ttl > 0, NULL);
650
651   priv = stream->priv;
652
653   addr = g_inet_address_new_from_string (address);
654   if (!addr) {
655     GST_ERROR ("failed to get inet addr from %s", address);
656     family = G_SOCKET_FAMILY_IPV4;
657   } else {
658     family = g_inet_address_get_family (addr);
659     g_object_unref (addr);
660   }
661
662   if (family == G_SOCKET_FAMILY_IPV6)
663     addrp = &priv->addr_v4;
664   else
665     addrp = &priv->addr_v6;
666
667   g_mutex_lock (&priv->lock);
668   if (*addrp == NULL) {
669     if (priv->pool == NULL)
670       goto no_pool;
671
672     *addrp = gst_rtsp_address_pool_reserve_address (priv->pool, address,
673         port, n_ports, ttl);
674     if (*addrp == NULL)
675       goto no_address;
676   } else {
677     if (strcmp ((*addrp)->address, address) ||
678         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
679         (*addrp)->ttl != ttl)
680       goto different_address;
681   }
682   result = gst_rtsp_address_copy (*addrp);
683   g_mutex_unlock (&priv->lock);
684
685   return result;
686
687   /* ERRORS */
688 no_pool:
689   {
690     GST_ERROR_OBJECT (stream, "no address pool specified");
691     g_mutex_unlock (&priv->lock);
692     return NULL;
693   }
694 no_address:
695   {
696     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
697         address);
698     g_mutex_unlock (&priv->lock);
699     return NULL;
700   }
701 different_address:
702   {
703     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
704         " reserved", address);
705     g_mutex_unlock (&priv->lock);
706     return NULL;
707   }
708 }
709
710 static gboolean
711 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
712     GSocketFamily family, GstElement * udpsrc_out[2],
713     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
714     GstRTSPAddress ** server_addr_out)
715 {
716   GstStateChangeReturn ret;
717   GstElement *udpsrc0, *udpsrc1;
718   GstElement *udpsink0, *udpsink1;
719   GSocket *rtp_socket = NULL;
720   GSocket *rtcp_socket;
721   gint tmp_rtp, tmp_rtcp;
722   guint count;
723   gint rtpport, rtcpport;
724   GList *rejected_addresses = NULL;
725   GstRTSPAddress *addr = NULL;
726   GInetAddress *inetaddr = NULL;
727   GSocketAddress *rtp_sockaddr = NULL;
728   GSocketAddress *rtcp_sockaddr = NULL;
729   const gchar *multisink_socket;
730
731   if (family == G_SOCKET_FAMILY_IPV6)
732     multisink_socket = "socket-v6";
733   else
734     multisink_socket = "socket";
735
736   udpsrc0 = NULL;
737   udpsrc1 = NULL;
738   udpsink0 = NULL;
739   udpsink1 = NULL;
740   count = 0;
741
742   /* Start with random port */
743   tmp_rtp = 0;
744
745   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
746       G_SOCKET_PROTOCOL_UDP, NULL);
747   if (!rtcp_socket)
748     goto no_udp_protocol;
749
750   if (*server_addr_out)
751     gst_rtsp_address_free (*server_addr_out);
752
753   /* try to allocate 2 UDP ports, the RTP port should be an even
754    * number and the RTCP port should be the next (uneven) port */
755 again:
756
757   if (rtp_socket == NULL) {
758     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
759         G_SOCKET_PROTOCOL_UDP, NULL);
760     if (!rtp_socket)
761       goto no_udp_protocol;
762   }
763
764   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
765     GstRTSPAddressFlags flags;
766
767     if (addr)
768       rejected_addresses = g_list_prepend (rejected_addresses, addr);
769
770     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
771     if (family == G_SOCKET_FAMILY_IPV6)
772       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
773     else
774       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
775
776     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
777
778     if (addr == NULL)
779       goto no_ports;
780
781     tmp_rtp = addr->port;
782
783     g_clear_object (&inetaddr);
784     inetaddr = g_inet_address_new_from_string (addr->address);
785   } else {
786     if (tmp_rtp != 0) {
787       tmp_rtp += 2;
788       if (++count > 20)
789         goto no_ports;
790     }
791
792     if (inetaddr == NULL)
793       inetaddr = g_inet_address_new_any (family);
794   }
795
796   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
797   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
798     g_object_unref (rtp_sockaddr);
799     goto again;
800   }
801   g_object_unref (rtp_sockaddr);
802
803   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
804   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
805     g_clear_object (&rtp_sockaddr);
806     goto socket_error;
807   }
808
809   tmp_rtp =
810       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
811   g_object_unref (rtp_sockaddr);
812
813   /* check if port is even */
814   if ((tmp_rtp & 1) != 0) {
815     /* port not even, close and allocate another */
816     tmp_rtp++;
817     g_clear_object (&rtp_socket);
818     goto again;
819   }
820
821   /* set port */
822   tmp_rtcp = tmp_rtp + 1;
823
824   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
825   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
826     g_object_unref (rtcp_sockaddr);
827     g_clear_object (&rtp_socket);
828     goto again;
829   }
830   g_object_unref (rtcp_sockaddr);
831
832   g_clear_object (&inetaddr);
833
834   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
835   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
836
837   if (udpsrc0 == NULL || udpsrc1 == NULL)
838     goto no_udp_protocol;
839
840   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
841   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
842
843   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
844   if (ret == GST_STATE_CHANGE_FAILURE)
845     goto element_error;
846   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
847   if (ret == GST_STATE_CHANGE_FAILURE)
848     goto element_error;
849
850   /* all fine, do port check */
851   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
852   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
853
854   /* this should not happen... */
855   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
856     goto port_error;
857
858   if (udpsink_out[0])
859     udpsink0 = udpsink_out[0];
860   else
861     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
862
863   if (!udpsink0)
864     goto no_udp_protocol;
865
866   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
867   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
868
869   if (udpsink_out[1])
870     udpsink1 = udpsink_out[1];
871   else
872     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
873
874   if (!udpsink1)
875     goto no_udp_protocol;
876
877   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
878   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
879   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
880
881   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
882   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
883   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
884   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
885   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
886   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
887   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
888   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
889
890   /* we keep these elements, we will further configure them when the
891    * client told us to really use the UDP ports. */
892   udpsrc_out[0] = udpsrc0;
893   udpsrc_out[1] = udpsrc1;
894   udpsink_out[0] = udpsink0;
895   udpsink_out[1] = udpsink1;
896   server_port_out->min = rtpport;
897   server_port_out->max = rtcpport;
898
899   *server_addr_out = addr;
900   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
901
902   g_object_unref (rtp_socket);
903   g_object_unref (rtcp_socket);
904
905   return TRUE;
906
907   /* ERRORS */
908 no_udp_protocol:
909   {
910     goto cleanup;
911   }
912 no_ports:
913   {
914     goto cleanup;
915   }
916 port_error:
917   {
918     goto cleanup;
919   }
920 socket_error:
921   {
922     goto cleanup;
923   }
924 element_error:
925   {
926     goto cleanup;
927   }
928 cleanup:
929   {
930     if (udpsrc0) {
931       gst_element_set_state (udpsrc0, GST_STATE_NULL);
932       gst_object_unref (udpsrc0);
933     }
934     if (udpsrc1) {
935       gst_element_set_state (udpsrc1, GST_STATE_NULL);
936       gst_object_unref (udpsrc1);
937     }
938     if (udpsink0) {
939       gst_element_set_state (udpsink0, GST_STATE_NULL);
940       gst_object_unref (udpsink0);
941     }
942     if (udpsink1) {
943       gst_element_set_state (udpsink1, GST_STATE_NULL);
944       gst_object_unref (udpsink1);
945     }
946     if (inetaddr)
947       g_object_unref (inetaddr);
948     g_list_free_full (rejected_addresses,
949         (GDestroyNotify) gst_rtsp_address_free);
950     if (addr)
951       gst_rtsp_address_free (addr);
952     if (rtp_socket)
953       g_object_unref (rtp_socket);
954     if (rtcp_socket)
955       g_object_unref (rtcp_socket);
956     return FALSE;
957   }
958 }
959
960 /* must be called with lock */
961 static gboolean
962 alloc_ports (GstRTSPStream * stream)
963 {
964   GstRTSPStreamPrivate *priv = stream->priv;
965
966   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
967       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
968       &priv->server_port_v4, &priv->server_addr_v4);
969
970   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
971       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
972       &priv->server_port_v6, &priv->server_addr_v6);
973
974   return priv->have_ipv4 || priv->have_ipv6;
975 }
976
977 /**
978  * gst_rtsp_stream_get_server_port:
979  * @stream: a #GstRTSPStream
980  * @server_port: (out): result server port
981  * @family: the port family to get
982  *
983  * Fill @server_port with the port pair used by the server. This function can
984  * only be called when @stream has been joined.
985  */
986 void
987 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
988     GstRTSPRange * server_port, GSocketFamily family)
989 {
990   GstRTSPStreamPrivate *priv;
991
992   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
993   priv = stream->priv;
994   g_return_if_fail (priv->is_joined);
995
996   g_mutex_lock (&priv->lock);
997   if (family == G_SOCKET_FAMILY_IPV4) {
998     if (server_port)
999       *server_port = priv->server_port_v4;
1000   } else {
1001     if (server_port)
1002       *server_port = priv->server_port_v6;
1003   }
1004   g_mutex_unlock (&priv->lock);
1005 }
1006
1007 /**
1008  * gst_rtsp_stream_get_rtpsession:
1009  * @stream: a #GstRTSPStream
1010  *
1011  * Get the RTP session of this stream.
1012  *
1013  * Returns: The RTP session of this stream. Unref after usage.
1014  */
1015 GObject *
1016 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1017 {
1018   GstRTSPStreamPrivate *priv;
1019   GObject *session;
1020
1021   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1022
1023   priv = stream->priv;
1024
1025   g_mutex_lock (&priv->lock);
1026   if ((session = priv->session))
1027     g_object_ref (session);
1028   g_mutex_unlock (&priv->lock);
1029
1030   return session;
1031 }
1032
1033 /**
1034  * gst_rtsp_stream_get_ssrc:
1035  * @stream: a #GstRTSPStream
1036  * @ssrc: (out): result ssrc
1037  *
1038  * Get the SSRC used by the RTP session of this stream. This function can only
1039  * be called when @stream has been joined.
1040  */
1041 void
1042 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1043 {
1044   GstRTSPStreamPrivate *priv;
1045
1046   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1047   priv = stream->priv;
1048   g_return_if_fail (priv->is_joined);
1049
1050   g_mutex_lock (&priv->lock);
1051   if (ssrc && priv->session)
1052     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1053   g_mutex_unlock (&priv->lock);
1054 }
1055
1056 /* executed from streaming thread */
1057 static void
1058 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1059 {
1060   GstRTSPStreamPrivate *priv = stream->priv;
1061   GstCaps *newcaps, *oldcaps;
1062
1063   newcaps = gst_pad_get_current_caps (pad);
1064
1065   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1066       newcaps);
1067
1068   g_mutex_lock (&priv->lock);
1069   oldcaps = priv->caps;
1070   priv->caps = newcaps;
1071   g_mutex_unlock (&priv->lock);
1072
1073   if (oldcaps)
1074     gst_caps_unref (oldcaps);
1075 }
1076
1077 static void
1078 dump_structure (const GstStructure * s)
1079 {
1080   gchar *sstr;
1081
1082   sstr = gst_structure_to_string (s);
1083   GST_INFO ("structure: %s", sstr);
1084   g_free (sstr);
1085 }
1086
1087 static GstRTSPStreamTransport *
1088 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1089 {
1090   GstRTSPStreamPrivate *priv = stream->priv;
1091   GList *walk;
1092   GstRTSPStreamTransport *result = NULL;
1093   const gchar *tmp;
1094   gchar *dest;
1095   guint port;
1096
1097   if (rtcp_from == NULL)
1098     return NULL;
1099
1100   tmp = g_strrstr (rtcp_from, ":");
1101   if (tmp == NULL)
1102     return NULL;
1103
1104   port = atoi (tmp + 1);
1105   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1106
1107   g_mutex_lock (&priv->lock);
1108   GST_INFO ("finding %s:%d in %d transports", dest, port,
1109       g_list_length (priv->transports));
1110
1111   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1112     GstRTSPStreamTransport *trans = walk->data;
1113     const GstRTSPTransport *tr;
1114     gint min, max;
1115
1116     tr = gst_rtsp_stream_transport_get_transport (trans);
1117
1118     min = tr->client_port.min;
1119     max = tr->client_port.max;
1120
1121     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1122       result = trans;
1123       break;
1124     }
1125   }
1126   if (result)
1127     g_object_ref (result);
1128   g_mutex_unlock (&priv->lock);
1129
1130   g_free (dest);
1131
1132   return result;
1133 }
1134
1135 static GstRTSPStreamTransport *
1136 check_transport (GObject * source, GstRTSPStream * stream)
1137 {
1138   GstStructure *stats;
1139   GstRTSPStreamTransport *trans;
1140
1141   /* see if we have a stream to match with the origin of the RTCP packet */
1142   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1143   if (trans == NULL) {
1144     g_object_get (source, "stats", &stats, NULL);
1145     if (stats) {
1146       const gchar *rtcp_from;
1147
1148       dump_structure (stats);
1149
1150       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1151       if ((trans = find_transport (stream, rtcp_from))) {
1152         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1153             source);
1154         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1155             g_object_unref);
1156       }
1157       gst_structure_free (stats);
1158     }
1159   }
1160   return trans;
1161 }
1162
1163
1164 static void
1165 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1166 {
1167   GstRTSPStreamTransport *trans;
1168
1169   GST_INFO ("%p: new source %p", stream, source);
1170
1171   trans = check_transport (source, stream);
1172
1173   if (trans)
1174     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1175 }
1176
1177 static void
1178 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1179 {
1180   GST_INFO ("%p: new SDES %p", stream, source);
1181 }
1182
1183 static void
1184 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1185 {
1186   GstRTSPStreamTransport *trans;
1187
1188   trans = check_transport (source, stream);
1189
1190   if (trans) {
1191     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1192     gst_rtsp_stream_transport_keep_alive (trans);
1193   }
1194 #ifdef DUMP_STATS
1195   {
1196     GstStructure *stats;
1197     g_object_get (source, "stats", &stats, NULL);
1198     if (stats) {
1199       dump_structure (stats);
1200       gst_structure_free (stats);
1201     }
1202   }
1203 #endif
1204 }
1205
1206 static void
1207 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1208 {
1209   GST_INFO ("%p: source %p bye", stream, source);
1210 }
1211
1212 static void
1213 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1214 {
1215   GstRTSPStreamTransport *trans;
1216
1217   GST_INFO ("%p: source %p bye timeout", stream, source);
1218
1219   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1220     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1221     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1222   }
1223 }
1224
1225 static void
1226 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1227 {
1228   GstRTSPStreamTransport *trans;
1229
1230   GST_INFO ("%p: source %p timeout", stream, source);
1231
1232   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1233     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1234     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1235   }
1236 }
1237
1238 static GstFlowReturn
1239 handle_new_sample (GstAppSink * sink, gpointer user_data)
1240 {
1241   GstRTSPStreamPrivate *priv;
1242   GList *walk;
1243   GstSample *sample;
1244   GstBuffer *buffer;
1245   GstRTSPStream *stream;
1246
1247   sample = gst_app_sink_pull_sample (sink);
1248   if (!sample)
1249     return GST_FLOW_OK;
1250
1251   stream = (GstRTSPStream *) user_data;
1252   priv = stream->priv;
1253   buffer = gst_sample_get_buffer (sample);
1254
1255   g_mutex_lock (&priv->lock);
1256   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1257     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1258
1259     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1260       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1261     } else {
1262       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1263     }
1264   }
1265   g_mutex_unlock (&priv->lock);
1266
1267   gst_sample_unref (sample);
1268
1269   return GST_FLOW_OK;
1270 }
1271
1272 static GstAppSinkCallbacks sink_cb = {
1273   NULL,                         /* not interested in EOS */
1274   NULL,                         /* not interested in preroll samples */
1275   handle_new_sample,
1276 };
1277
1278 /**
1279  * gst_rtsp_stream_join_bin:
1280  * @stream: a #GstRTSPStream
1281  * @bin: a #GstBin to join
1282  * @rtpbin: a rtpbin element in @bin
1283  * @state: the target state of the new elements
1284  *
1285  * Join the #Gstbin @bin that contains the element @rtpbin.
1286  *
1287  * @stream will link to @rtpbin, which must be inside @bin. The elements
1288  * added to @bin will be set to the state given in @state.
1289  *
1290  * Returns: %TRUE on success.
1291  */
1292 gboolean
1293 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1294     GstElement * rtpbin, GstState state)
1295 {
1296   GstRTSPStreamPrivate *priv;
1297   gint i;
1298   guint idx;
1299   gchar *name;
1300   GstPad *pad, *teepad, *queuepad, *selpad;
1301   GstPadLinkReturn ret;
1302
1303   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1304   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1305   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1306
1307   priv = stream->priv;
1308
1309   g_mutex_lock (&priv->lock);
1310   if (priv->is_joined)
1311     goto was_joined;
1312
1313   /* create a session with the same index as the stream */
1314   idx = priv->idx;
1315
1316   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1317
1318   if (!alloc_ports (stream))
1319     goto no_ports;
1320
1321   /* update the dscp qos field in the sinks */
1322   update_dscp_qos (stream);
1323
1324   /* get a pad for sending RTP */
1325   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1326   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1327   g_free (name);
1328   /* link the RTP pad to the session manager, it should not really fail unless
1329    * this is not really an RTP pad */
1330   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1331   if (ret != GST_PAD_LINK_OK)
1332     goto link_failed;
1333
1334   /* get pads from the RTP session element for sending and receiving
1335    * RTP/RTCP*/
1336   name = g_strdup_printf ("send_rtp_src_%u", idx);
1337   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1338   g_free (name);
1339   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1340   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1341   g_free (name);
1342   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1343   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1344   g_free (name);
1345   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1346   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1347   g_free (name);
1348
1349   /* get the session */
1350   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1351
1352   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1353       stream);
1354   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1355       stream);
1356   g_signal_connect (priv->session, "on-ssrc-active",
1357       (GCallback) on_ssrc_active, stream);
1358   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1359       stream);
1360   g_signal_connect (priv->session, "on-bye-timeout",
1361       (GCallback) on_bye_timeout, stream);
1362   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1363       stream);
1364
1365   for (i = 0; i < 2; i++) {
1366     /* For the sender we create this bit of pipeline for both
1367      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1368      * we need to add a queue before appsink to make the pipeline
1369      * not block. For the TCP case, we want to pump data to the
1370      * client as fast as possible anyway.
1371      *
1372      * .--------.      .-----.    .---------.
1373      * | rtpbin |      | tee |    | udpsink |
1374      * |       send->sink   src->sink       |
1375      * '--------'      |     |    '---------'
1376      *                 |     |    .---------.    .---------.
1377      *                 |     |    |  queue  |    | appsink |
1378      *                 |    src->sink      src->sink       |
1379      *                 '-----'    '---------'    '---------'
1380      */
1381     /* make tee for RTP/RTCP */
1382     priv->tee[i] = gst_element_factory_make ("tee", NULL);
1383     gst_bin_add (bin, priv->tee[i]);
1384
1385     /* and link to rtpbin send pad */
1386     pad = gst_element_get_static_pad (priv->tee[i], "sink");
1387     gst_pad_link (priv->send_src[i], pad);
1388     gst_object_unref (pad);
1389
1390     /* add udpsink */
1391     gst_bin_add (bin, priv->udpsink[i]);
1392
1393     /* link tee to udpsink */
1394     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1395     pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1396     gst_pad_link (teepad, pad);
1397     gst_object_unref (pad);
1398     gst_object_unref (teepad);
1399
1400     /* make queue */
1401     priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1402     gst_bin_add (bin, priv->appqueue[i]);
1403     /* and link to tee */
1404     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1405     pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1406     gst_pad_link (teepad, pad);
1407     gst_object_unref (pad);
1408     gst_object_unref (teepad);
1409
1410     /* make appsink */
1411     priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1412     g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1413     g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1414     gst_bin_add (bin, priv->appsink[i]);
1415     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1416         &sink_cb, stream, NULL);
1417     /* and link to queue */
1418     queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1419     pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1420     gst_pad_link (queuepad, pad);
1421     gst_object_unref (pad);
1422     gst_object_unref (queuepad);
1423
1424     /* For the receiver we create this bit of pipeline for both
1425      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1426      * and it is all funneled into the rtpbin receive pad.
1427      *
1428      * .--------.     .--------.    .--------.
1429      * | udpsrc |     | funnel |    | rtpbin |
1430      * |       src->sink      src->sink      |
1431      * '--------'     |        |    '--------'
1432      * .--------.     |        |
1433      * | appsrc |     |        |
1434      * |       src->sink       |
1435      * '--------'     '--------'
1436      */
1437     /* make funnel for the RTP/RTCP receivers */
1438     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1439     gst_bin_add (bin, priv->funnel[i]);
1440
1441     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1442     gst_pad_link (pad, priv->recv_sink[i]);
1443     gst_object_unref (pad);
1444
1445     if (priv->udpsrc_v4[i]) {
1446       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1447        * values */
1448       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1449       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1450       /* add udpsrc */
1451       gst_bin_add (bin, priv->udpsrc_v4[i]);
1452
1453       /* and link to the funnel v4 */
1454       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1455       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1456       gst_pad_link (pad, selpad);
1457       gst_object_unref (pad);
1458       gst_object_unref (selpad);
1459     }
1460
1461     if (priv->udpsrc_v6[i]) {
1462       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1463       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1464       gst_bin_add (bin, priv->udpsrc_v6[i]);
1465
1466       /* and link to the funnel v6 */
1467       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1468       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1469       gst_pad_link (pad, selpad);
1470       gst_object_unref (pad);
1471       gst_object_unref (selpad);
1472     }
1473
1474     /* make and add appsrc */
1475     priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1476     gst_bin_add (bin, priv->appsrc[i]);
1477     /* and link to the funnel */
1478     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1479     pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1480     gst_pad_link (pad, selpad);
1481     gst_object_unref (pad);
1482     gst_object_unref (selpad);
1483
1484     /* check if we need to set to a special state */
1485     if (state != GST_STATE_NULL) {
1486       gst_element_set_state (priv->udpsink[i], state);
1487       gst_element_set_state (priv->appsink[i], state);
1488       gst_element_set_state (priv->appqueue[i], state);
1489       gst_element_set_state (priv->tee[i], state);
1490       gst_element_set_state (priv->funnel[i], state);
1491       gst_element_set_state (priv->appsrc[i], state);
1492     }
1493   }
1494
1495   /* be notified of caps changes */
1496   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1497       (GCallback) caps_notify, stream);
1498
1499   priv->is_joined = TRUE;
1500   g_mutex_unlock (&priv->lock);
1501
1502   return TRUE;
1503
1504   /* ERRORS */
1505 was_joined:
1506   {
1507     g_mutex_unlock (&priv->lock);
1508     return TRUE;
1509   }
1510 no_ports:
1511   {
1512     g_mutex_unlock (&priv->lock);
1513     GST_WARNING ("failed to allocate ports %u", idx);
1514     return FALSE;
1515   }
1516 link_failed:
1517   {
1518     GST_WARNING ("failed to link stream %u", idx);
1519     gst_object_unref (priv->send_rtp_sink);
1520     priv->send_rtp_sink = NULL;
1521     g_mutex_unlock (&priv->lock);
1522     return FALSE;
1523   }
1524 }
1525
1526 /**
1527  * gst_rtsp_stream_leave_bin:
1528  * @stream: a #GstRTSPStream
1529  * @bin: a #GstBin
1530  * @rtpbin: a rtpbin #GstElement
1531  *
1532  * Remove the elements of @stream from @bin.
1533  *
1534  * Return: %TRUE on success.
1535  */
1536 gboolean
1537 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1538     GstElement * rtpbin)
1539 {
1540   GstRTSPStreamPrivate *priv;
1541   gint i;
1542
1543   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1544   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1545   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1546
1547   priv = stream->priv;
1548
1549   g_mutex_lock (&priv->lock);
1550   if (!priv->is_joined)
1551     goto was_not_joined;
1552
1553   /* all transports must be removed by now */
1554   g_return_val_if_fail (priv->transports == NULL, FALSE);
1555
1556   GST_INFO ("stream %p leaving bin", stream);
1557
1558   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1559   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1560   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1561   gst_object_unref (priv->send_rtp_sink);
1562   priv->send_rtp_sink = NULL;
1563
1564   for (i = 0; i < 2; i++) {
1565     gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1566     gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1567     gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1568     gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1569     gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1570     gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1571     if (priv->udpsrc_v4[i]) {
1572       /* and set udpsrc to NULL now before removing */
1573       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1574       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1575       /* removing them should also nicely release the request
1576        * pads when they finalize */
1577       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1578     }
1579     if (priv->udpsrc_v6[i]) {
1580       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1581       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1582       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1583     }
1584     gst_bin_remove (bin, priv->udpsink[i]);
1585     gst_bin_remove (bin, priv->appsrc[i]);
1586     gst_bin_remove (bin, priv->appsink[i]);
1587     gst_bin_remove (bin, priv->appqueue[i]);
1588     gst_bin_remove (bin, priv->tee[i]);
1589     gst_bin_remove (bin, priv->funnel[i]);
1590
1591     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1592     gst_object_unref (priv->recv_sink[i]);
1593     priv->recv_sink[i] = NULL;
1594
1595     priv->udpsrc_v4[i] = NULL;
1596     priv->udpsrc_v6[i] = NULL;
1597     priv->udpsink[i] = NULL;
1598     priv->appsrc[i] = NULL;
1599     priv->appsink[i] = NULL;
1600     priv->appqueue[i] = NULL;
1601     priv->tee[i] = NULL;
1602     priv->funnel[i] = NULL;
1603   }
1604   gst_object_unref (priv->send_src[0]);
1605   priv->send_src[0] = NULL;
1606
1607   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1608   gst_object_unref (priv->send_src[1]);
1609   priv->send_src[1] = NULL;
1610
1611   g_object_unref (priv->session);
1612   priv->session = NULL;
1613   if (priv->caps)
1614     gst_caps_unref (priv->caps);
1615   priv->caps = NULL;
1616
1617   priv->is_joined = FALSE;
1618   g_mutex_unlock (&priv->lock);
1619
1620   return TRUE;
1621
1622 was_not_joined:
1623   {
1624     return TRUE;
1625   }
1626 }
1627
1628 /**
1629  * gst_rtsp_stream_get_rtpinfo:
1630  * @stream: a #GstRTSPStream
1631  * @rtptime: result RTP timestamp
1632  * @seq: result RTP seqnum
1633  *
1634  * Retrieve the current rtptime and seq. This is used to
1635  * construct a RTPInfo reply header.
1636  *
1637  * Returns: %TRUE when rtptime and seq could be determined.
1638  */
1639 gboolean
1640 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1641     guint * rtptime, guint * seq)
1642 {
1643   GstRTSPStreamPrivate *priv;
1644   GObjectClass *payobjclass;
1645
1646   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1647   g_return_val_if_fail (rtptime != NULL, FALSE);
1648   g_return_val_if_fail (seq != NULL, FALSE);
1649
1650   priv = stream->priv;
1651
1652   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1653
1654   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1655       !g_object_class_find_property (payobjclass, "timestamp"))
1656     return FALSE;
1657
1658   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1659
1660   return TRUE;
1661 }
1662
1663 /**
1664  * gst_rtsp_stream_get_caps:
1665  * @stream: a #GstRTSPStream
1666  *
1667  * Retrieve the current caps of @stream.
1668  *
1669  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1670  *    after usage.
1671  */
1672 GstCaps *
1673 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1674 {
1675   GstRTSPStreamPrivate *priv;
1676   GstCaps *result;
1677
1678   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1679
1680   priv = stream->priv;
1681
1682   g_mutex_lock (&priv->lock);
1683   if ((result = priv->caps))
1684     gst_caps_ref (result);
1685   g_mutex_unlock (&priv->lock);
1686
1687   return result;
1688 }
1689
1690 /**
1691  * gst_rtsp_stream_recv_rtp:
1692  * @stream: a #GstRTSPStream
1693  * @buffer: (transfer full): a #GstBuffer
1694  *
1695  * Handle an RTP buffer for the stream. This method is usually called when a
1696  * message has been received from a client using the TCP transport.
1697  *
1698  * This function takes ownership of @buffer.
1699  *
1700  * Returns: a GstFlowReturn.
1701  */
1702 GstFlowReturn
1703 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1704 {
1705   GstRTSPStreamPrivate *priv;
1706   GstFlowReturn ret;
1707   GstElement *element;
1708
1709   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1710   priv = stream->priv;
1711   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1712   g_return_val_if_fail (priv->is_joined, FALSE);
1713
1714   g_mutex_lock (&priv->lock);
1715   element = gst_object_ref (priv->appsrc[0]);
1716   g_mutex_unlock (&priv->lock);
1717
1718   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1719
1720   gst_object_unref (element);
1721
1722   return ret;
1723 }
1724
1725 /**
1726  * gst_rtsp_stream_recv_rtcp:
1727  * @stream: a #GstRTSPStream
1728  * @buffer: (transfer full): a #GstBuffer
1729  *
1730  * Handle an RTCP buffer for the stream. This method is usually called when a
1731  * message has been received from a client using the TCP transport.
1732  *
1733  * This function takes ownership of @buffer.
1734  *
1735  * Returns: a GstFlowReturn.
1736  */
1737 GstFlowReturn
1738 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1739 {
1740   GstRTSPStreamPrivate *priv;
1741   GstFlowReturn ret;
1742   GstElement *element;
1743
1744   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1745   priv = stream->priv;
1746   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1747   g_return_val_if_fail (priv->is_joined, FALSE);
1748
1749   g_mutex_lock (&priv->lock);
1750   element = gst_object_ref (priv->appsrc[1]);
1751   g_mutex_unlock (&priv->lock);
1752
1753   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1754
1755   gst_object_unref (element);
1756
1757   return ret;
1758 }
1759
1760 /* must be called with lock */
1761 static gboolean
1762 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1763     gboolean add)
1764 {
1765   GstRTSPStreamPrivate *priv = stream->priv;
1766   const GstRTSPTransport *tr;
1767
1768   tr = gst_rtsp_stream_transport_get_transport (trans);
1769
1770   switch (tr->lower_transport) {
1771     case GST_RTSP_LOWER_TRANS_UDP:
1772     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1773     {
1774       gchar *dest;
1775       gint min, max;
1776       guint ttl = 0;
1777
1778       dest = tr->destination;
1779       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1780         min = tr->port.min;
1781         max = tr->port.max;
1782         ttl = tr->ttl;
1783       } else {
1784         min = tr->client_port.min;
1785         max = tr->client_port.max;
1786       }
1787
1788       if (add) {
1789         GST_INFO ("adding %s:%d-%d", dest, min, max);
1790         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1791         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1792         if (ttl > 0) {
1793           GST_INFO ("setting ttl-mc %d", ttl);
1794           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1795           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1796         }
1797         priv->transports = g_list_prepend (priv->transports, trans);
1798       } else {
1799         GST_INFO ("removing %s:%d-%d", dest, min, max);
1800         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1801         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1802         priv->transports = g_list_remove (priv->transports, trans);
1803       }
1804       break;
1805     }
1806     case GST_RTSP_LOWER_TRANS_TCP:
1807       if (add) {
1808         GST_INFO ("adding TCP %s", tr->destination);
1809         priv->transports = g_list_prepend (priv->transports, trans);
1810       } else {
1811         GST_INFO ("removing TCP %s", tr->destination);
1812         priv->transports = g_list_remove (priv->transports, trans);
1813       }
1814       break;
1815     default:
1816       goto unknown_transport;
1817   }
1818   return TRUE;
1819
1820   /* ERRORS */
1821 unknown_transport:
1822   {
1823     GST_INFO ("Unknown transport %d", tr->lower_transport);
1824     return FALSE;
1825   }
1826 }
1827
1828
1829 /**
1830  * gst_rtsp_stream_add_transport:
1831  * @stream: a #GstRTSPStream
1832  * @trans: a #GstRTSPStreamTransport
1833  *
1834  * Add the transport in @trans to @stream. The media of @stream will
1835  * then also be send to the values configured in @trans.
1836  *
1837  * @stream must be joined to a bin.
1838  *
1839  * @trans must contain a valid #GstRTSPTransport.
1840  *
1841  * Returns: %TRUE if @trans was added
1842  */
1843 gboolean
1844 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1845     GstRTSPStreamTransport * trans)
1846 {
1847   GstRTSPStreamPrivate *priv;
1848   gboolean res;
1849
1850   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1851   priv = stream->priv;
1852   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1853   g_return_val_if_fail (priv->is_joined, FALSE);
1854
1855   g_mutex_lock (&priv->lock);
1856   res = update_transport (stream, trans, TRUE);
1857   g_mutex_unlock (&priv->lock);
1858
1859   return res;
1860 }
1861
1862 /**
1863  * gst_rtsp_stream_remove_transport:
1864  * @stream: a #GstRTSPStream
1865  * @trans: a #GstRTSPStreamTransport
1866  *
1867  * Remove the transport in @trans from @stream. The media of @stream will
1868  * not be sent to the values configured in @trans.
1869  *
1870  * @stream must be joined to a bin.
1871  *
1872  * @trans must contain a valid #GstRTSPTransport.
1873  *
1874  * Returns: %TRUE if @trans was removed
1875  */
1876 gboolean
1877 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1878     GstRTSPStreamTransport * trans)
1879 {
1880   GstRTSPStreamPrivate *priv;
1881   gboolean res;
1882
1883   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1884   priv = stream->priv;
1885   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1886   g_return_val_if_fail (priv->is_joined, FALSE);
1887
1888   g_mutex_lock (&priv->lock);
1889   res = update_transport (stream, trans, FALSE);
1890   g_mutex_unlock (&priv->lock);
1891
1892   return res;
1893 }