docs: improve docs
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * Last reviewed on 2013-07-11 (1.0.0)
25  */
26
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <gio/gio.h>
32
33 #include <gst/app/gstappsrc.h>
34 #include <gst/app/gstappsink.h>
35
36 #include "rtsp-stream.h"
37
38 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
39      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
40
41 struct _GstRTSPStreamPrivate
42 {
43   GMutex lock;
44   guint idx;
45   GstPad *srcpad;
46   GstElement *payloader;
47   guint buffer_size;
48   gboolean is_joined;
49   gchar *control;
50
51   /* pads on the rtpbin */
52   GstPad *send_rtp_sink;
53   GstPad *recv_sink[2];
54   GstPad *send_src[2];
55
56   /* the RTPSession object */
57   GObject *session;
58
59   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
60    * sockets */
61   GstElement *udpsrc_v4[2];
62
63   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
64    * sockets */
65   GstElement *udpsrc_v6[2];
66
67   GstElement *udpsink[2];
68
69   /* for TCP transport */
70   GstElement *appsrc[2];
71   GstElement *appqueue[2];
72   GstElement *appsink[2];
73
74   GstElement *tee[2];
75   GstElement *funnel[2];
76
77   /* server ports for sending/receiving over ipv4 */
78   GstRTSPRange server_port_v4;
79   GstRTSPAddress *server_addr_v4;
80   gboolean have_ipv4;
81
82   /* server ports for sending/receiving over ipv6 */
83   GstRTSPRange server_port_v6;
84   GstRTSPAddress *server_addr_v6;
85   gboolean have_ipv6;
86
87   /* multicast addresses */
88   GstRTSPAddressPool *pool;
89   GstRTSPAddress *addr_v4;
90   GstRTSPAddress *addr_v6;
91
92   /* the caps of the stream */
93   gulong caps_sig;
94   GstCaps *caps;
95
96   /* transports we stream to */
97   guint n_active;
98   GList *transports;
99
100   gint dscp_qos;
101 };
102
103 #define DEFAULT_CONTROL NULL
104
105 enum
106 {
107   PROP_0,
108   PROP_CONTROL,
109   PROP_LAST
110 };
111
112 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
113 #define GST_CAT_DEFAULT rtsp_stream_debug
114
115 static GQuark ssrc_stream_map_key;
116
117 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
118     GValue * value, GParamSpec * pspec);
119 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
120     const GValue * value, GParamSpec * pspec);
121
122 static void gst_rtsp_stream_finalize (GObject * obj);
123
124 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
125
126 static void
127 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
128 {
129   GObjectClass *gobject_class;
130
131   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
132
133   gobject_class = G_OBJECT_CLASS (klass);
134
135   gobject_class->get_property = gst_rtsp_stream_get_property;
136   gobject_class->set_property = gst_rtsp_stream_set_property;
137   gobject_class->finalize = gst_rtsp_stream_finalize;
138
139   g_object_class_install_property (gobject_class, PROP_CONTROL,
140       g_param_spec_string ("control", "Control",
141           "The control string for this stream", DEFAULT_CONTROL,
142           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
143
144   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
145
146   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
147 }
148
149 static void
150 gst_rtsp_stream_init (GstRTSPStream * stream)
151 {
152   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
153
154   GST_DEBUG ("new stream %p", stream);
155
156   stream->priv = priv;
157
158   priv->dscp_qos = -1;
159   priv->control = g_strdup (DEFAULT_CONTROL);
160
161   g_mutex_init (&priv->lock);
162 }
163
164 static void
165 gst_rtsp_stream_finalize (GObject * obj)
166 {
167   GstRTSPStream *stream;
168   GstRTSPStreamPrivate *priv;
169
170   stream = GST_RTSP_STREAM (obj);
171   priv = stream->priv;
172
173   GST_DEBUG ("finalize stream %p", stream);
174
175   /* we really need to be unjoined now */
176   g_return_if_fail (!priv->is_joined);
177
178   if (priv->addr_v4)
179     gst_rtsp_address_free (priv->addr_v4);
180   if (priv->addr_v6)
181     gst_rtsp_address_free (priv->addr_v6);
182   if (priv->server_addr_v4)
183     gst_rtsp_address_free (priv->server_addr_v4);
184   if (priv->server_addr_v6)
185     gst_rtsp_address_free (priv->server_addr_v6);
186   if (priv->pool)
187     g_object_unref (priv->pool);
188   gst_object_unref (priv->payloader);
189   gst_object_unref (priv->srcpad);
190   g_free (priv->control);
191   g_mutex_clear (&priv->lock);
192
193   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
194 }
195
196 static void
197 gst_rtsp_stream_get_property (GObject * object, guint propid,
198     GValue * value, GParamSpec * pspec)
199 {
200   GstRTSPStream *stream = GST_RTSP_STREAM (object);
201
202   switch (propid) {
203     case PROP_CONTROL:
204       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
205       break;
206     default:
207       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
208   }
209 }
210
211 static void
212 gst_rtsp_stream_set_property (GObject * object, guint propid,
213     const GValue * value, GParamSpec * pspec)
214 {
215   GstRTSPStream *stream = GST_RTSP_STREAM (object);
216
217   switch (propid) {
218     case PROP_CONTROL:
219       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
220       break;
221     default:
222       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
223   }
224 }
225
226 /**
227  * gst_rtsp_stream_new:
228  * @idx: an index
229  * @srcpad: a #GstPad
230  * @payloader: a #GstElement
231  *
232  * Create a new media stream with index @idx that handles RTP data on
233  * @srcpad and has a payloader element @payloader.
234  *
235  * Returns: a new #GstRTSPStream
236  */
237 GstRTSPStream *
238 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
239 {
240   GstRTSPStreamPrivate *priv;
241   GstRTSPStream *stream;
242
243   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
244   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
245   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
246
247   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
248   priv = stream->priv;
249   priv->idx = idx;
250   priv->payloader = gst_object_ref (payloader);
251   priv->srcpad = gst_object_ref (srcpad);
252
253   return stream;
254 }
255
256 /**
257  * gst_rtsp_stream_get_index:
258  * @stream: a #GstRTSPStream
259  *
260  * Get the stream index.
261  *
262  * Return: the stream index.
263  */
264 guint
265 gst_rtsp_stream_get_index (GstRTSPStream * stream)
266 {
267   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
268
269   return stream->priv->idx;
270 }
271
272 /**
273  * gst_rtsp_stream_get_srcpad:
274  * @stream: a #GstRTSPStream
275  *
276  * Get the srcpad associated with @stream.
277  *
278  * Return: the srcpad. Unref after usage.
279  */
280 GstPad *
281 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
282 {
283   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
284
285   return gst_object_ref (stream->priv->srcpad);
286 }
287
288 /**
289  * gst_rtsp_stream_get_control:
290  * @stream: a #GstRTSPStream
291  *
292  * Get the control string to identify this stream.
293  *
294  * Return: the control string. free after usage.
295  */
296 gchar *
297 gst_rtsp_stream_get_control (GstRTSPStream * stream)
298 {
299   GstRTSPStreamPrivate *priv;
300   gchar *result;
301
302   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
303
304   priv = stream->priv;
305
306   g_mutex_lock (&priv->lock);
307   if ((result = g_strdup (priv->control)) == NULL)
308     result = g_strdup_printf ("stream=%u", priv->idx);
309   g_mutex_unlock (&priv->lock);
310
311   return result;
312 }
313
314 /**
315  * gst_rtsp_stream_set_control:
316  * @stream: a #GstRTSPStream
317  * @control: a control string
318  *
319  * Set the control string in @stream.
320  */
321 void
322 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
323 {
324   GstRTSPStreamPrivate *priv;
325
326   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
327
328   priv = stream->priv;
329
330   g_mutex_lock (&priv->lock);
331   g_free (priv->control);
332   priv->control = g_strdup (control);
333   g_mutex_unlock (&priv->lock);
334 }
335
336 /**
337  * gst_rtsp_stream_has_control:
338  * @stream: a #GstRTSPStream
339  * @control: a control string
340  *
341  * Check if @stream has the control string @control.
342  *
343  * Returns: %TRUE is @stream has @control as the control string
344  */
345 gboolean
346 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
347 {
348   GstRTSPStreamPrivate *priv;
349   gboolean res;
350
351   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
352
353   priv = stream->priv;
354
355   g_mutex_lock (&priv->lock);
356   if (priv->control)
357     res = g_strcmp0 (priv->control, control);
358   else {
359     guint streamid;
360     sscanf (control, "stream=%u", &streamid);
361     res = (streamid == priv->idx);
362   }
363   g_mutex_unlock (&priv->lock);
364
365   return res;
366 }
367
368 /**
369  * gst_rtsp_stream_set_mtu:
370  * @stream: a #GstRTSPStream
371  * @mtu: a new MTU
372  *
373  * Configure the mtu in the payloader of @stream to @mtu.
374  */
375 void
376 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
377 {
378   GstRTSPStreamPrivate *priv;
379
380   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
381
382   priv = stream->priv;
383
384   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
385
386   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
387 }
388
389 /**
390  * gst_rtsp_stream_get_mtu:
391  * @stream: a #GstRTSPStream
392  *
393  * Get the configured MTU in the payloader of @stream.
394  *
395  * Returns: the MTU of the payloader.
396  */
397 guint
398 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
399 {
400   GstRTSPStreamPrivate *priv;
401   guint mtu;
402
403   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
404
405   priv = stream->priv;
406
407   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
408
409   return mtu;
410 }
411
412 /* Update the dscp qos property on the udp sinks */
413 static void
414 update_dscp_qos (GstRTSPStream * stream)
415 {
416   GstRTSPStreamPrivate *priv;
417
418   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
419
420   priv = stream->priv;
421
422   if (priv->udpsink[0]) {
423     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
424         NULL);
425   }
426
427   if (priv->udpsink[1]) {
428     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
429         NULL);
430   }
431 }
432
433 /**
434  * gst_rtsp_stream_set_dscp_qos:
435  * @stream: a #GstRTSPStream
436  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
437  *
438  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
439  */
440 void
441 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
442 {
443   GstRTSPStreamPrivate *priv;
444
445   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
446
447   priv = stream->priv;
448
449   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
450
451   if (dscp_qos < -1 || dscp_qos > 63) {
452     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
453     return;
454   }
455
456   priv->dscp_qos = dscp_qos;
457
458   update_dscp_qos (stream);
459 }
460
461 /**
462  * gst_rtsp_stream_get_dscp_qos:
463  * @stream: a #GstRTSPStream
464  *
465  * Get the configured DSCP QoS in of the outgoing sockets.
466  *
467  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
468  */
469 gint
470 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
471 {
472   GstRTSPStreamPrivate *priv;
473
474   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
475
476   priv = stream->priv;
477
478   return priv->dscp_qos;
479 }
480
481
482 /**
483  * gst_rtsp_stream_set_address_pool:
484  * @stream: a #GstRTSPStream
485  * @pool: a #GstRTSPAddressPool
486  *
487  * configure @pool to be used as the address pool of @stream.
488  */
489 void
490 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
491     GstRTSPAddressPool * pool)
492 {
493   GstRTSPStreamPrivate *priv;
494   GstRTSPAddressPool *old;
495
496   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
497
498   priv = stream->priv;
499
500   GST_LOG_OBJECT (stream, "set address pool %p", pool);
501
502   g_mutex_lock (&priv->lock);
503   if ((old = priv->pool) != pool)
504     priv->pool = pool ? g_object_ref (pool) : NULL;
505   else
506     old = NULL;
507   g_mutex_unlock (&priv->lock);
508
509   if (old)
510     g_object_unref (old);
511 }
512
513 /**
514  * gst_rtsp_stream_get_address_pool:
515  * @stream: a #GstRTSPStream
516  *
517  * Get the #GstRTSPAddressPool used as the address pool of @stream.
518  *
519  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
520  * usage.
521  */
522 GstRTSPAddressPool *
523 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
524 {
525   GstRTSPStreamPrivate *priv;
526   GstRTSPAddressPool *result;
527
528   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
529
530   priv = stream->priv;
531
532   g_mutex_lock (&priv->lock);
533   if ((result = priv->pool))
534     g_object_ref (result);
535   g_mutex_unlock (&priv->lock);
536
537   return result;
538 }
539
540 /**
541  * gst_rtsp_stream_get_multicast_address:
542  * @stream: a #GstRTSPStream
543  * @family: the #GSocketFamily
544  *
545  * Get the multicast address of @stream for @family.
546  *
547  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
548  * allocated. gst_rtsp_address_free() after usage.
549  */
550 GstRTSPAddress *
551 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
552     GSocketFamily family)
553 {
554   GstRTSPStreamPrivate *priv;
555   GstRTSPAddress *result;
556   GstRTSPAddress **addrp;
557   GstRTSPAddressFlags flags;
558
559   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
560
561   priv = stream->priv;
562
563   if (family == G_SOCKET_FAMILY_IPV6) {
564     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
565     addrp = &priv->addr_v4;
566   } else {
567     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
568     addrp = &priv->addr_v6;
569   }
570
571   g_mutex_lock (&priv->lock);
572   if (*addrp == NULL) {
573     if (priv->pool == NULL)
574       goto no_pool;
575
576     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
577
578     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
579     if (*addrp == NULL)
580       goto no_address;
581   }
582   result = gst_rtsp_address_copy (*addrp);
583   g_mutex_unlock (&priv->lock);
584
585   return result;
586
587   /* ERRORS */
588 no_pool:
589   {
590     GST_ERROR_OBJECT (stream, "no address pool specified");
591     g_mutex_unlock (&priv->lock);
592     return NULL;
593   }
594 no_address:
595   {
596     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
597     g_mutex_unlock (&priv->lock);
598     return NULL;
599   }
600 }
601
602 /**
603  * gst_rtsp_stream_reserve_address:
604  * @stream: a #GstRTSPStream
605  * @address: an address
606  * @port: a port
607  * @n_ports: n_ports
608  * @ttl: a TTL
609  *
610  * Reserve @address and @port as the address and port of @stream.
611  *
612  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
613  * reserved. gst_rtsp_address_free() after usage.
614  */
615 GstRTSPAddress *
616 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
617     const gchar * address, guint port, guint n_ports, guint ttl)
618 {
619   GstRTSPStreamPrivate *priv;
620   GstRTSPAddress *result;
621   GInetAddress *addr;
622   GSocketFamily family;
623   GstRTSPAddress **addrp;
624
625   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
626   g_return_val_if_fail (address != NULL, NULL);
627   g_return_val_if_fail (port > 0, NULL);
628   g_return_val_if_fail (n_ports > 0, NULL);
629   g_return_val_if_fail (ttl > 0, NULL);
630
631   priv = stream->priv;
632
633   addr = g_inet_address_new_from_string (address);
634   if (!addr) {
635     GST_ERROR ("failed to get inet addr from %s", address);
636     family = G_SOCKET_FAMILY_IPV4;
637   } else {
638     family = g_inet_address_get_family (addr);
639     g_object_unref (addr);
640   }
641
642   if (family == G_SOCKET_FAMILY_IPV6)
643     addrp = &priv->addr_v4;
644   else
645     addrp = &priv->addr_v6;
646
647   g_mutex_lock (&priv->lock);
648   if (*addrp == NULL) {
649     if (priv->pool == NULL)
650       goto no_pool;
651
652     *addrp = gst_rtsp_address_pool_reserve_address (priv->pool, address,
653         port, n_ports, ttl);
654     if (*addrp == NULL)
655       goto no_address;
656   } else {
657     if (strcmp ((*addrp)->address, address) ||
658         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
659         (*addrp)->ttl != ttl)
660       goto different_address;
661   }
662   result = gst_rtsp_address_copy (*addrp);
663   g_mutex_unlock (&priv->lock);
664
665   return result;
666
667   /* ERRORS */
668 no_pool:
669   {
670     GST_ERROR_OBJECT (stream, "no address pool specified");
671     g_mutex_unlock (&priv->lock);
672     return NULL;
673   }
674 no_address:
675   {
676     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
677         address);
678     g_mutex_unlock (&priv->lock);
679     return NULL;
680   }
681 different_address:
682   {
683     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
684         " reserved", address);
685     g_mutex_unlock (&priv->lock);
686     return NULL;
687   }
688 }
689
690 static gboolean
691 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
692     GSocketFamily family, GstElement * udpsrc_out[2],
693     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
694     GstRTSPAddress ** server_addr_out)
695 {
696   GstStateChangeReturn ret;
697   GstElement *udpsrc0, *udpsrc1;
698   GstElement *udpsink0, *udpsink1;
699   GSocket *rtp_socket = NULL;
700   GSocket *rtcp_socket;
701   gint tmp_rtp, tmp_rtcp;
702   guint count;
703   gint rtpport, rtcpport;
704   GList *rejected_addresses = NULL;
705   GstRTSPAddress *addr = NULL;
706   GInetAddress *inetaddr = NULL;
707   GSocketAddress *rtp_sockaddr = NULL;
708   GSocketAddress *rtcp_sockaddr = NULL;
709   const gchar *multisink_socket;
710
711   if (family == G_SOCKET_FAMILY_IPV6)
712     multisink_socket = "socket-v6";
713   else
714     multisink_socket = "socket";
715
716   udpsrc0 = NULL;
717   udpsrc1 = NULL;
718   udpsink0 = NULL;
719   udpsink1 = NULL;
720   count = 0;
721
722   /* Start with random port */
723   tmp_rtp = 0;
724
725   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
726       G_SOCKET_PROTOCOL_UDP, NULL);
727   if (!rtcp_socket)
728     goto no_udp_protocol;
729
730   if (*server_addr_out)
731     gst_rtsp_address_free (*server_addr_out);
732
733   /* try to allocate 2 UDP ports, the RTP port should be an even
734    * number and the RTCP port should be the next (uneven) port */
735 again:
736
737   if (rtp_socket == NULL) {
738     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
739         G_SOCKET_PROTOCOL_UDP, NULL);
740     if (!rtp_socket)
741       goto no_udp_protocol;
742   }
743
744   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
745     GstRTSPAddressFlags flags;
746
747     if (addr)
748       rejected_addresses = g_list_prepend (rejected_addresses, addr);
749
750     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
751     if (family == G_SOCKET_FAMILY_IPV6)
752       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
753     else
754       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
755
756     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
757
758     if (addr == NULL)
759       goto no_ports;
760
761     tmp_rtp = addr->port;
762
763     g_clear_object (&inetaddr);
764     inetaddr = g_inet_address_new_from_string (addr->address);
765   } else {
766     if (tmp_rtp != 0) {
767       tmp_rtp += 2;
768       if (++count > 20)
769         goto no_ports;
770     }
771
772     if (inetaddr == NULL)
773       inetaddr = g_inet_address_new_any (family);
774   }
775
776   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
777   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
778     g_object_unref (rtp_sockaddr);
779     goto again;
780   }
781   g_object_unref (rtp_sockaddr);
782
783   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
784   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
785     g_clear_object (&rtp_sockaddr);
786     goto socket_error;
787   }
788
789   tmp_rtp =
790       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
791   g_object_unref (rtp_sockaddr);
792
793   /* check if port is even */
794   if ((tmp_rtp & 1) != 0) {
795     /* port not even, close and allocate another */
796     tmp_rtp++;
797     g_clear_object (&rtp_socket);
798     goto again;
799   }
800
801   /* set port */
802   tmp_rtcp = tmp_rtp + 1;
803
804   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
805   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
806     g_object_unref (rtcp_sockaddr);
807     g_clear_object (&rtp_socket);
808     goto again;
809   }
810   g_object_unref (rtcp_sockaddr);
811
812   g_clear_object (&inetaddr);
813
814   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
815   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
816
817   if (udpsrc0 == NULL || udpsrc1 == NULL)
818     goto no_udp_protocol;
819
820   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
821   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
822
823   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
824   if (ret == GST_STATE_CHANGE_FAILURE)
825     goto element_error;
826   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
827   if (ret == GST_STATE_CHANGE_FAILURE)
828     goto element_error;
829
830   /* all fine, do port check */
831   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
832   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
833
834   /* this should not happen... */
835   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
836     goto port_error;
837
838   if (udpsink_out[0])
839     udpsink0 = udpsink_out[0];
840   else
841     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
842
843   if (!udpsink0)
844     goto no_udp_protocol;
845
846   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
847   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
848
849   if (udpsink_out[1])
850     udpsink1 = udpsink_out[1];
851   else
852     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
853
854   if (!udpsink1)
855     goto no_udp_protocol;
856
857   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
858   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
859   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
860
861   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
862   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
863   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
864   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
865   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
866   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
867   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
868   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
869
870   /* we keep these elements, we will further configure them when the
871    * client told us to really use the UDP ports. */
872   udpsrc_out[0] = udpsrc0;
873   udpsrc_out[1] = udpsrc1;
874   udpsink_out[0] = udpsink0;
875   udpsink_out[1] = udpsink1;
876   server_port_out->min = rtpport;
877   server_port_out->max = rtcpport;
878
879   *server_addr_out = addr;
880   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
881
882   g_object_unref (rtp_socket);
883   g_object_unref (rtcp_socket);
884
885   return TRUE;
886
887   /* ERRORS */
888 no_udp_protocol:
889   {
890     goto cleanup;
891   }
892 no_ports:
893   {
894     goto cleanup;
895   }
896 port_error:
897   {
898     goto cleanup;
899   }
900 socket_error:
901   {
902     goto cleanup;
903   }
904 element_error:
905   {
906     goto cleanup;
907   }
908 cleanup:
909   {
910     if (udpsrc0) {
911       gst_element_set_state (udpsrc0, GST_STATE_NULL);
912       gst_object_unref (udpsrc0);
913     }
914     if (udpsrc1) {
915       gst_element_set_state (udpsrc1, GST_STATE_NULL);
916       gst_object_unref (udpsrc1);
917     }
918     if (udpsink0) {
919       gst_element_set_state (udpsink0, GST_STATE_NULL);
920       gst_object_unref (udpsink0);
921     }
922     if (udpsink1) {
923       gst_element_set_state (udpsink1, GST_STATE_NULL);
924       gst_object_unref (udpsink1);
925     }
926     if (inetaddr)
927       g_object_unref (inetaddr);
928     g_list_free_full (rejected_addresses,
929         (GDestroyNotify) gst_rtsp_address_free);
930     if (addr)
931       gst_rtsp_address_free (addr);
932     if (rtp_socket)
933       g_object_unref (rtp_socket);
934     if (rtcp_socket)
935       g_object_unref (rtcp_socket);
936     return FALSE;
937   }
938 }
939
940 /* must be called with lock */
941 static gboolean
942 alloc_ports (GstRTSPStream * stream)
943 {
944   GstRTSPStreamPrivate *priv = stream->priv;
945
946   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
947       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
948       &priv->server_port_v4, &priv->server_addr_v4);
949
950   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
951       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
952       &priv->server_port_v6, &priv->server_addr_v6);
953
954   return priv->have_ipv4 || priv->have_ipv6;
955 }
956
957 /**
958  * gst_rtsp_stream_get_server_port:
959  * @stream: a #GstRTSPStream
960  * @server_port: (out): result server port
961  * @family: the port family to get
962  *
963  * Fill @server_port with the port pair used by the server. This function can
964  * only be called when @stream has been joined.
965  */
966 void
967 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
968     GstRTSPRange * server_port, GSocketFamily family)
969 {
970   GstRTSPStreamPrivate *priv;
971
972   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
973   priv = stream->priv;
974   g_return_if_fail (priv->is_joined);
975
976   g_mutex_lock (&priv->lock);
977   if (family == G_SOCKET_FAMILY_IPV4) {
978     if (server_port)
979       *server_port = priv->server_port_v4;
980   } else {
981     if (server_port)
982       *server_port = priv->server_port_v6;
983   }
984   g_mutex_unlock (&priv->lock);
985 }
986
987 /**
988  * gst_rtsp_stream_get_rtpsession:
989  * @stream: a #GstRTSPStream
990  *
991  * Get the RTP session of this stream.
992  *
993  * Returns: The RTP session of this stream. Unref after usage.
994  */
995 GObject *
996 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
997 {
998   GstRTSPStreamPrivate *priv;
999   GObject *session;
1000
1001   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1002
1003   priv = stream->priv;
1004
1005   g_mutex_lock (&priv->lock);
1006   if ((session = priv->session))
1007     g_object_ref (session);
1008   g_mutex_unlock (&priv->lock);
1009
1010   return session;
1011 }
1012
1013 /**
1014  * gst_rtsp_stream_get_ssrc:
1015  * @stream: a #GstRTSPStream
1016  * @ssrc: (out): result ssrc
1017  *
1018  * Get the SSRC used by the RTP session of this stream. This function can only
1019  * be called when @stream has been joined.
1020  */
1021 void
1022 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1023 {
1024   GstRTSPStreamPrivate *priv;
1025
1026   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1027   priv = stream->priv;
1028   g_return_if_fail (priv->is_joined);
1029
1030   g_mutex_lock (&priv->lock);
1031   if (ssrc && priv->session)
1032     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1033   g_mutex_unlock (&priv->lock);
1034 }
1035
1036 /* executed from streaming thread */
1037 static void
1038 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1039 {
1040   GstRTSPStreamPrivate *priv = stream->priv;
1041   GstCaps *newcaps, *oldcaps;
1042
1043   newcaps = gst_pad_get_current_caps (pad);
1044
1045   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1046       newcaps);
1047
1048   g_mutex_lock (&priv->lock);
1049   oldcaps = priv->caps;
1050   priv->caps = newcaps;
1051   g_mutex_unlock (&priv->lock);
1052
1053   if (oldcaps)
1054     gst_caps_unref (oldcaps);
1055 }
1056
1057 static void
1058 dump_structure (const GstStructure * s)
1059 {
1060   gchar *sstr;
1061
1062   sstr = gst_structure_to_string (s);
1063   GST_INFO ("structure: %s", sstr);
1064   g_free (sstr);
1065 }
1066
1067 static GstRTSPStreamTransport *
1068 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1069 {
1070   GstRTSPStreamPrivate *priv = stream->priv;
1071   GList *walk;
1072   GstRTSPStreamTransport *result = NULL;
1073   const gchar *tmp;
1074   gchar *dest;
1075   guint port;
1076
1077   if (rtcp_from == NULL)
1078     return NULL;
1079
1080   tmp = g_strrstr (rtcp_from, ":");
1081   if (tmp == NULL)
1082     return NULL;
1083
1084   port = atoi (tmp + 1);
1085   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1086
1087   g_mutex_lock (&priv->lock);
1088   GST_INFO ("finding %s:%d in %d transports", dest, port,
1089       g_list_length (priv->transports));
1090
1091   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1092     GstRTSPStreamTransport *trans = walk->data;
1093     const GstRTSPTransport *tr;
1094     gint min, max;
1095
1096     tr = gst_rtsp_stream_transport_get_transport (trans);
1097
1098     min = tr->client_port.min;
1099     max = tr->client_port.max;
1100
1101     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1102       result = trans;
1103       break;
1104     }
1105   }
1106   if (result)
1107     g_object_ref (result);
1108   g_mutex_unlock (&priv->lock);
1109
1110   g_free (dest);
1111
1112   return result;
1113 }
1114
1115 static GstRTSPStreamTransport *
1116 check_transport (GObject * source, GstRTSPStream * stream)
1117 {
1118   GstStructure *stats;
1119   GstRTSPStreamTransport *trans;
1120
1121   /* see if we have a stream to match with the origin of the RTCP packet */
1122   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1123   if (trans == NULL) {
1124     g_object_get (source, "stats", &stats, NULL);
1125     if (stats) {
1126       const gchar *rtcp_from;
1127
1128       dump_structure (stats);
1129
1130       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1131       if ((trans = find_transport (stream, rtcp_from))) {
1132         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1133             source);
1134         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1135             g_object_unref);
1136       }
1137       gst_structure_free (stats);
1138     }
1139   }
1140   return trans;
1141 }
1142
1143
1144 static void
1145 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1146 {
1147   GstRTSPStreamTransport *trans;
1148
1149   GST_INFO ("%p: new source %p", stream, source);
1150
1151   trans = check_transport (source, stream);
1152
1153   if (trans)
1154     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1155 }
1156
1157 static void
1158 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1159 {
1160   GST_INFO ("%p: new SDES %p", stream, source);
1161 }
1162
1163 static void
1164 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1165 {
1166   GstRTSPStreamTransport *trans;
1167
1168   trans = check_transport (source, stream);
1169
1170   if (trans) {
1171     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1172     gst_rtsp_stream_transport_keep_alive (trans);
1173   }
1174 #ifdef DUMP_STATS
1175   {
1176     GstStructure *stats;
1177     g_object_get (source, "stats", &stats, NULL);
1178     if (stats) {
1179       dump_structure (stats);
1180       gst_structure_free (stats);
1181     }
1182   }
1183 #endif
1184 }
1185
1186 static void
1187 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1188 {
1189   GST_INFO ("%p: source %p bye", stream, source);
1190 }
1191
1192 static void
1193 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1194 {
1195   GstRTSPStreamTransport *trans;
1196
1197   GST_INFO ("%p: source %p bye timeout", stream, source);
1198
1199   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1200     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1201     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1202   }
1203 }
1204
1205 static void
1206 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1207 {
1208   GstRTSPStreamTransport *trans;
1209
1210   GST_INFO ("%p: source %p timeout", stream, source);
1211
1212   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1213     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1214     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1215   }
1216 }
1217
1218 static GstFlowReturn
1219 handle_new_sample (GstAppSink * sink, gpointer user_data)
1220 {
1221   GstRTSPStreamPrivate *priv;
1222   GList *walk;
1223   GstSample *sample;
1224   GstBuffer *buffer;
1225   GstRTSPStream *stream;
1226
1227   sample = gst_app_sink_pull_sample (sink);
1228   if (!sample)
1229     return GST_FLOW_OK;
1230
1231   stream = (GstRTSPStream *) user_data;
1232   priv = stream->priv;
1233   buffer = gst_sample_get_buffer (sample);
1234
1235   g_mutex_lock (&priv->lock);
1236   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1237     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1238
1239     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1240       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1241     } else {
1242       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1243     }
1244   }
1245   g_mutex_unlock (&priv->lock);
1246
1247   gst_sample_unref (sample);
1248
1249   return GST_FLOW_OK;
1250 }
1251
1252 static GstAppSinkCallbacks sink_cb = {
1253   NULL,                         /* not interested in EOS */
1254   NULL,                         /* not interested in preroll samples */
1255   handle_new_sample,
1256 };
1257
1258 /**
1259  * gst_rtsp_stream_join_bin:
1260  * @stream: a #GstRTSPStream
1261  * @bin: a #GstBin to join
1262  * @rtpbin: a rtpbin element in @bin
1263  * @state: the target state of the new elements
1264  *
1265  * Join the #Gstbin @bin that contains the element @rtpbin.
1266  *
1267  * @stream will link to @rtpbin, which must be inside @bin. The elements
1268  * added to @bin will be set to the state given in @state.
1269  *
1270  * Returns: %TRUE on success.
1271  */
1272 gboolean
1273 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1274     GstElement * rtpbin, GstState state)
1275 {
1276   GstRTSPStreamPrivate *priv;
1277   gint i;
1278   guint idx;
1279   gchar *name;
1280   GstPad *pad, *teepad, *queuepad, *selpad;
1281   GstPadLinkReturn ret;
1282
1283   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1284   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1285   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1286
1287   priv = stream->priv;
1288
1289   g_mutex_lock (&priv->lock);
1290   if (priv->is_joined)
1291     goto was_joined;
1292
1293   /* create a session with the same index as the stream */
1294   idx = priv->idx;
1295
1296   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1297
1298   if (!alloc_ports (stream))
1299     goto no_ports;
1300
1301   /* update the dscp qos field in the sinks */
1302   update_dscp_qos (stream);
1303
1304   /* get a pad for sending RTP */
1305   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1306   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1307   g_free (name);
1308   /* link the RTP pad to the session manager, it should not really fail unless
1309    * this is not really an RTP pad */
1310   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1311   if (ret != GST_PAD_LINK_OK)
1312     goto link_failed;
1313
1314   /* get pads from the RTP session element for sending and receiving
1315    * RTP/RTCP*/
1316   name = g_strdup_printf ("send_rtp_src_%u", idx);
1317   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1318   g_free (name);
1319   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1320   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1321   g_free (name);
1322   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1323   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1324   g_free (name);
1325   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1326   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1327   g_free (name);
1328
1329   /* get the session */
1330   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1331
1332   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1333       stream);
1334   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1335       stream);
1336   g_signal_connect (priv->session, "on-ssrc-active",
1337       (GCallback) on_ssrc_active, stream);
1338   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1339       stream);
1340   g_signal_connect (priv->session, "on-bye-timeout",
1341       (GCallback) on_bye_timeout, stream);
1342   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1343       stream);
1344
1345   for (i = 0; i < 2; i++) {
1346     /* For the sender we create this bit of pipeline for both
1347      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1348      * we need to add a queue before appsink to make the pipeline
1349      * not block. For the TCP case, we want to pump data to the
1350      * client as fast as possible anyway.
1351      *
1352      * .--------.      .-----.    .---------.
1353      * | rtpbin |      | tee |    | udpsink |
1354      * |       send->sink   src->sink       |
1355      * '--------'      |     |    '---------'
1356      *                 |     |    .---------.    .---------.
1357      *                 |     |    |  queue  |    | appsink |
1358      *                 |    src->sink      src->sink       |
1359      *                 '-----'    '---------'    '---------'
1360      */
1361     /* make tee for RTP/RTCP */
1362     priv->tee[i] = gst_element_factory_make ("tee", NULL);
1363     gst_bin_add (bin, priv->tee[i]);
1364
1365     /* and link to rtpbin send pad */
1366     pad = gst_element_get_static_pad (priv->tee[i], "sink");
1367     gst_pad_link (priv->send_src[i], pad);
1368     gst_object_unref (pad);
1369
1370     /* add udpsink */
1371     gst_bin_add (bin, priv->udpsink[i]);
1372
1373     /* link tee to udpsink */
1374     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1375     pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1376     gst_pad_link (teepad, pad);
1377     gst_object_unref (pad);
1378     gst_object_unref (teepad);
1379
1380     /* make queue */
1381     priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1382     gst_bin_add (bin, priv->appqueue[i]);
1383     /* and link to tee */
1384     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1385     pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1386     gst_pad_link (teepad, pad);
1387     gst_object_unref (pad);
1388     gst_object_unref (teepad);
1389
1390     /* make appsink */
1391     priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1392     g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1393     g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1394     gst_bin_add (bin, priv->appsink[i]);
1395     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1396         &sink_cb, stream, NULL);
1397     /* and link to queue */
1398     queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1399     pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1400     gst_pad_link (queuepad, pad);
1401     gst_object_unref (pad);
1402     gst_object_unref (queuepad);
1403
1404     /* For the receiver we create this bit of pipeline for both
1405      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1406      * and it is all funneled into the rtpbin receive pad.
1407      *
1408      * .--------.     .--------.    .--------.
1409      * | udpsrc |     | funnel |    | rtpbin |
1410      * |       src->sink      src->sink      |
1411      * '--------'     |        |    '--------'
1412      * .--------.     |        |
1413      * | appsrc |     |        |
1414      * |       src->sink       |
1415      * '--------'     '--------'
1416      */
1417     /* make funnel for the RTP/RTCP receivers */
1418     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1419     gst_bin_add (bin, priv->funnel[i]);
1420
1421     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1422     gst_pad_link (pad, priv->recv_sink[i]);
1423     gst_object_unref (pad);
1424
1425     if (priv->udpsrc_v4[i]) {
1426       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1427        * values */
1428       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1429       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1430       /* add udpsrc */
1431       gst_bin_add (bin, priv->udpsrc_v4[i]);
1432
1433       /* and link to the funnel v4 */
1434       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1435       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1436       gst_pad_link (pad, selpad);
1437       gst_object_unref (pad);
1438       gst_object_unref (selpad);
1439     }
1440
1441     if (priv->udpsrc_v6[i]) {
1442       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1443       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1444       gst_bin_add (bin, priv->udpsrc_v6[i]);
1445
1446       /* and link to the funnel v6 */
1447       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1448       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1449       gst_pad_link (pad, selpad);
1450       gst_object_unref (pad);
1451       gst_object_unref (selpad);
1452     }
1453
1454     /* make and add appsrc */
1455     priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1456     gst_bin_add (bin, priv->appsrc[i]);
1457     /* and link to the funnel */
1458     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1459     pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1460     gst_pad_link (pad, selpad);
1461     gst_object_unref (pad);
1462     gst_object_unref (selpad);
1463
1464     /* check if we need to set to a special state */
1465     if (state != GST_STATE_NULL) {
1466       gst_element_set_state (priv->udpsink[i], state);
1467       gst_element_set_state (priv->appsink[i], state);
1468       gst_element_set_state (priv->appqueue[i], state);
1469       gst_element_set_state (priv->tee[i], state);
1470       gst_element_set_state (priv->funnel[i], state);
1471       gst_element_set_state (priv->appsrc[i], state);
1472     }
1473   }
1474
1475   /* be notified of caps changes */
1476   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1477       (GCallback) caps_notify, stream);
1478
1479   priv->is_joined = TRUE;
1480   g_mutex_unlock (&priv->lock);
1481
1482   return TRUE;
1483
1484   /* ERRORS */
1485 was_joined:
1486   {
1487     g_mutex_unlock (&priv->lock);
1488     return TRUE;
1489   }
1490 no_ports:
1491   {
1492     g_mutex_unlock (&priv->lock);
1493     GST_WARNING ("failed to allocate ports %u", idx);
1494     return FALSE;
1495   }
1496 link_failed:
1497   {
1498     GST_WARNING ("failed to link stream %u", idx);
1499     gst_object_unref (priv->send_rtp_sink);
1500     priv->send_rtp_sink = NULL;
1501     g_mutex_unlock (&priv->lock);
1502     return FALSE;
1503   }
1504 }
1505
1506 /**
1507  * gst_rtsp_stream_leave_bin:
1508  * @stream: a #GstRTSPStream
1509  * @bin: a #GstBin
1510  * @rtpbin: a rtpbin #GstElement
1511  *
1512  * Remove the elements of @stream from @bin.
1513  *
1514  * Return: %TRUE on success.
1515  */
1516 gboolean
1517 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1518     GstElement * rtpbin)
1519 {
1520   GstRTSPStreamPrivate *priv;
1521   gint i;
1522
1523   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1524   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1525   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1526
1527   priv = stream->priv;
1528
1529   g_mutex_lock (&priv->lock);
1530   if (!priv->is_joined)
1531     goto was_not_joined;
1532
1533   /* all transports must be removed by now */
1534   g_return_val_if_fail (priv->transports == NULL, FALSE);
1535
1536   GST_INFO ("stream %p leaving bin", stream);
1537
1538   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1539   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1540   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1541   gst_object_unref (priv->send_rtp_sink);
1542   priv->send_rtp_sink = NULL;
1543
1544   for (i = 0; i < 2; i++) {
1545     gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1546     gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1547     gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1548     gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1549     gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1550     gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1551     if (priv->udpsrc_v4[i]) {
1552       /* and set udpsrc to NULL now before removing */
1553       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1554       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1555       /* removing them should also nicely release the request
1556        * pads when they finalize */
1557       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1558     }
1559     if (priv->udpsrc_v6[i]) {
1560       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1561       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1562       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1563     }
1564     gst_bin_remove (bin, priv->udpsink[i]);
1565     gst_bin_remove (bin, priv->appsrc[i]);
1566     gst_bin_remove (bin, priv->appsink[i]);
1567     gst_bin_remove (bin, priv->appqueue[i]);
1568     gst_bin_remove (bin, priv->tee[i]);
1569     gst_bin_remove (bin, priv->funnel[i]);
1570
1571     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1572     gst_object_unref (priv->recv_sink[i]);
1573     priv->recv_sink[i] = NULL;
1574
1575     priv->udpsrc_v4[i] = NULL;
1576     priv->udpsrc_v6[i] = NULL;
1577     priv->udpsink[i] = NULL;
1578     priv->appsrc[i] = NULL;
1579     priv->appsink[i] = NULL;
1580     priv->appqueue[i] = NULL;
1581     priv->tee[i] = NULL;
1582     priv->funnel[i] = NULL;
1583   }
1584   gst_object_unref (priv->send_src[0]);
1585   priv->send_src[0] = NULL;
1586
1587   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1588   gst_object_unref (priv->send_src[1]);
1589   priv->send_src[1] = NULL;
1590
1591   g_object_unref (priv->session);
1592   priv->session = NULL;
1593   if (priv->caps)
1594     gst_caps_unref (priv->caps);
1595   priv->caps = NULL;
1596
1597   priv->is_joined = FALSE;
1598   g_mutex_unlock (&priv->lock);
1599
1600   return TRUE;
1601
1602 was_not_joined:
1603   {
1604     return TRUE;
1605   }
1606 }
1607
1608 /**
1609  * gst_rtsp_stream_get_rtpinfo:
1610  * @stream: a #GstRTSPStream
1611  * @rtptime: result RTP timestamp
1612  * @seq: result RTP seqnum
1613  *
1614  * Retrieve the current rtptime and seq. This is used to
1615  * construct a RTPInfo reply header.
1616  *
1617  * Returns: %TRUE when rtptime and seq could be determined.
1618  */
1619 gboolean
1620 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1621     guint * rtptime, guint * seq)
1622 {
1623   GstRTSPStreamPrivate *priv;
1624   GObjectClass *payobjclass;
1625
1626   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1627   g_return_val_if_fail (rtptime != NULL, FALSE);
1628   g_return_val_if_fail (seq != NULL, FALSE);
1629
1630   priv = stream->priv;
1631
1632   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1633
1634   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1635       !g_object_class_find_property (payobjclass, "timestamp"))
1636     return FALSE;
1637
1638   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1639
1640   return TRUE;
1641 }
1642
1643 /**
1644  * gst_rtsp_stream_get_caps:
1645  * @stream: a #GstRTSPStream
1646  *
1647  * Retrieve the current caps of @stream.
1648  *
1649  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1650  *    after usage.
1651  */
1652 GstCaps *
1653 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1654 {
1655   GstRTSPStreamPrivate *priv;
1656   GstCaps *result;
1657
1658   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1659
1660   priv = stream->priv;
1661
1662   g_mutex_lock (&priv->lock);
1663   if ((result = priv->caps))
1664     gst_caps_ref (result);
1665   g_mutex_unlock (&priv->lock);
1666
1667   return result;
1668 }
1669
1670 /**
1671  * gst_rtsp_stream_recv_rtp:
1672  * @stream: a #GstRTSPStream
1673  * @buffer: (transfer full): a #GstBuffer
1674  *
1675  * Handle an RTP buffer for the stream. This method is usually called when a
1676  * message has been received from a client using the TCP transport.
1677  *
1678  * This function takes ownership of @buffer.
1679  *
1680  * Returns: a GstFlowReturn.
1681  */
1682 GstFlowReturn
1683 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1684 {
1685   GstRTSPStreamPrivate *priv;
1686   GstFlowReturn ret;
1687   GstElement *element;
1688
1689   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1690   priv = stream->priv;
1691   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1692   g_return_val_if_fail (priv->is_joined, FALSE);
1693
1694   g_mutex_lock (&priv->lock);
1695   element = gst_object_ref (priv->appsrc[0]);
1696   g_mutex_unlock (&priv->lock);
1697
1698   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1699
1700   gst_object_unref (element);
1701
1702   return ret;
1703 }
1704
1705 /**
1706  * gst_rtsp_stream_recv_rtcp:
1707  * @stream: a #GstRTSPStream
1708  * @buffer: (transfer full): a #GstBuffer
1709  *
1710  * Handle an RTCP buffer for the stream. This method is usually called when a
1711  * message has been received from a client using the TCP transport.
1712  *
1713  * This function takes ownership of @buffer.
1714  *
1715  * Returns: a GstFlowReturn.
1716  */
1717 GstFlowReturn
1718 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1719 {
1720   GstRTSPStreamPrivate *priv;
1721   GstFlowReturn ret;
1722   GstElement *element;
1723
1724   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1725   priv = stream->priv;
1726   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1727   g_return_val_if_fail (priv->is_joined, FALSE);
1728
1729   g_mutex_lock (&priv->lock);
1730   element = gst_object_ref (priv->appsrc[1]);
1731   g_mutex_unlock (&priv->lock);
1732
1733   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1734
1735   gst_object_unref (element);
1736
1737   return ret;
1738 }
1739
1740 /* must be called with lock */
1741 static gboolean
1742 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1743     gboolean add)
1744 {
1745   GstRTSPStreamPrivate *priv = stream->priv;
1746   const GstRTSPTransport *tr;
1747
1748   tr = gst_rtsp_stream_transport_get_transport (trans);
1749
1750   switch (tr->lower_transport) {
1751     case GST_RTSP_LOWER_TRANS_UDP:
1752     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1753     {
1754       gchar *dest;
1755       gint min, max;
1756       guint ttl = 0;
1757
1758       dest = tr->destination;
1759       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1760         min = tr->port.min;
1761         max = tr->port.max;
1762         ttl = tr->ttl;
1763       } else {
1764         min = tr->client_port.min;
1765         max = tr->client_port.max;
1766       }
1767
1768       if (add) {
1769         GST_INFO ("adding %s:%d-%d", dest, min, max);
1770         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1771         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1772         if (ttl > 0) {
1773           GST_INFO ("setting ttl-mc %d", ttl);
1774           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1775           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1776         }
1777         priv->transports = g_list_prepend (priv->transports, trans);
1778       } else {
1779         GST_INFO ("removing %s:%d-%d", dest, min, max);
1780         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1781         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1782         priv->transports = g_list_remove (priv->transports, trans);
1783       }
1784       break;
1785     }
1786     case GST_RTSP_LOWER_TRANS_TCP:
1787       if (add) {
1788         GST_INFO ("adding TCP %s", tr->destination);
1789         priv->transports = g_list_prepend (priv->transports, trans);
1790       } else {
1791         GST_INFO ("removing TCP %s", tr->destination);
1792         priv->transports = g_list_remove (priv->transports, trans);
1793       }
1794       break;
1795     default:
1796       goto unknown_transport;
1797   }
1798   return TRUE;
1799
1800   /* ERRORS */
1801 unknown_transport:
1802   {
1803     GST_INFO ("Unknown transport %d", tr->lower_transport);
1804     return FALSE;
1805   }
1806 }
1807
1808
1809 /**
1810  * gst_rtsp_stream_add_transport:
1811  * @stream: a #GstRTSPStream
1812  * @trans: a #GstRTSPStreamTransport
1813  *
1814  * Add the transport in @trans to @stream. The media of @stream will
1815  * then also be send to the values configured in @trans.
1816  *
1817  * @stream must be joined to a bin.
1818  *
1819  * @trans must contain a valid #GstRTSPTransport.
1820  *
1821  * Returns: %TRUE if @trans was added
1822  */
1823 gboolean
1824 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1825     GstRTSPStreamTransport * trans)
1826 {
1827   GstRTSPStreamPrivate *priv;
1828   gboolean res;
1829
1830   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1831   priv = stream->priv;
1832   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1833   g_return_val_if_fail (priv->is_joined, FALSE);
1834
1835   g_mutex_lock (&priv->lock);
1836   res = update_transport (stream, trans, TRUE);
1837   g_mutex_unlock (&priv->lock);
1838
1839   return res;
1840 }
1841
1842 /**
1843  * gst_rtsp_stream_remove_transport:
1844  * @stream: a #GstRTSPStream
1845  * @trans: a #GstRTSPStreamTransport
1846  *
1847  * Remove the transport in @trans from @stream. The media of @stream will
1848  * not be sent to the values configured in @trans.
1849  *
1850  * @stream must be joined to a bin.
1851  *
1852  * @trans must contain a valid #GstRTSPTransport.
1853  *
1854  * Returns: %TRUE if @trans was removed
1855  */
1856 gboolean
1857 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1858     GstRTSPStreamTransport * trans)
1859 {
1860   GstRTSPStreamPrivate *priv;
1861   gboolean res;
1862
1863   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1864   priv = stream->priv;
1865   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1866   g_return_val_if_fail (priv->is_joined, FALSE);
1867
1868   g_mutex_lock (&priv->lock);
1869   res = update_transport (stream, trans, FALSE);
1870   g_mutex_unlock (&priv->lock);
1871
1872   return res;
1873 }