rtsp-stream: add getter for payload type
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPLowerTrans protocols;
72
73   /* pads on the rtpbin */
74   GstPad *send_rtp_sink;
75   GstPad *recv_sink[2];
76   GstPad *send_src[2];
77
78   /* the RTPSession object */
79   GObject *session;
80
81   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
82    * sockets */
83   GstElement *udpsrc_v4[2];
84
85   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
86    * sockets */
87   GstElement *udpsrc_v6[2];
88
89   GstElement *udpsink[2];
90
91   /* for TCP transport */
92   GstElement *appsrc[2];
93   GstElement *appqueue[2];
94   GstElement *appsink[2];
95
96   GstElement *tee[2];
97   GstElement *funnel[2];
98
99   /* server ports for sending/receiving over ipv4 */
100   GstRTSPRange server_port_v4;
101   GstRTSPAddress *server_addr_v4;
102   gboolean have_ipv4;
103
104   /* server ports for sending/receiving over ipv6 */
105   GstRTSPRange server_port_v6;
106   GstRTSPAddress *server_addr_v6;
107   gboolean have_ipv6;
108
109   /* multicast addresses */
110   GstRTSPAddressPool *pool;
111   GstRTSPAddress *addr_v4;
112   GstRTSPAddress *addr_v6;
113
114   /* the caps of the stream */
115   gulong caps_sig;
116   GstCaps *caps;
117
118   /* transports we stream to */
119   guint n_active;
120   GList *transports;
121
122   gint dscp_qos;
123 };
124
125 #define DEFAULT_CONTROL         NULL
126 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
127                                         GST_RTSP_LOWER_TRANS_TCP
128
129 enum
130 {
131   PROP_0,
132   PROP_CONTROL,
133   PROP_PROTOCOLS,
134   PROP_LAST
135 };
136
137 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
138 #define GST_CAT_DEFAULT rtsp_stream_debug
139
140 static GQuark ssrc_stream_map_key;
141
142 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
143     GValue * value, GParamSpec * pspec);
144 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
145     const GValue * value, GParamSpec * pspec);
146
147 static void gst_rtsp_stream_finalize (GObject * obj);
148
149 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
150
151 static void
152 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
153 {
154   GObjectClass *gobject_class;
155
156   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
157
158   gobject_class = G_OBJECT_CLASS (klass);
159
160   gobject_class->get_property = gst_rtsp_stream_get_property;
161   gobject_class->set_property = gst_rtsp_stream_set_property;
162   gobject_class->finalize = gst_rtsp_stream_finalize;
163
164   g_object_class_install_property (gobject_class, PROP_CONTROL,
165       g_param_spec_string ("control", "Control",
166           "The control string for this stream", DEFAULT_CONTROL,
167           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
168
169   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
170       g_param_spec_flags ("protocols", "Protocols",
171           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
172           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
173
174   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
175
176   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
177 }
178
179 static void
180 gst_rtsp_stream_init (GstRTSPStream * stream)
181 {
182   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
183
184   GST_DEBUG ("new stream %p", stream);
185
186   stream->priv = priv;
187
188   priv->dscp_qos = -1;
189   priv->control = g_strdup (DEFAULT_CONTROL);
190   priv->protocols = DEFAULT_PROTOCOLS;
191
192   g_mutex_init (&priv->lock);
193 }
194
195 static void
196 gst_rtsp_stream_finalize (GObject * obj)
197 {
198   GstRTSPStream *stream;
199   GstRTSPStreamPrivate *priv;
200
201   stream = GST_RTSP_STREAM (obj);
202   priv = stream->priv;
203
204   GST_DEBUG ("finalize stream %p", stream);
205
206   /* we really need to be unjoined now */
207   g_return_if_fail (!priv->is_joined);
208
209   if (priv->addr_v4)
210     gst_rtsp_address_free (priv->addr_v4);
211   if (priv->addr_v6)
212     gst_rtsp_address_free (priv->addr_v6);
213   if (priv->server_addr_v4)
214     gst_rtsp_address_free (priv->server_addr_v4);
215   if (priv->server_addr_v6)
216     gst_rtsp_address_free (priv->server_addr_v6);
217   if (priv->pool)
218     g_object_unref (priv->pool);
219   gst_object_unref (priv->payloader);
220   gst_object_unref (priv->srcpad);
221   g_free (priv->control);
222   g_mutex_clear (&priv->lock);
223
224   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
225 }
226
227 static void
228 gst_rtsp_stream_get_property (GObject * object, guint propid,
229     GValue * value, GParamSpec * pspec)
230 {
231   GstRTSPStream *stream = GST_RTSP_STREAM (object);
232
233   switch (propid) {
234     case PROP_CONTROL:
235       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
236       break;
237     case PROP_PROTOCOLS:
238       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
239       break;
240     default:
241       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
242   }
243 }
244
245 static void
246 gst_rtsp_stream_set_property (GObject * object, guint propid,
247     const GValue * value, GParamSpec * pspec)
248 {
249   GstRTSPStream *stream = GST_RTSP_STREAM (object);
250
251   switch (propid) {
252     case PROP_CONTROL:
253       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
254       break;
255     case PROP_PROTOCOLS:
256       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
257       break;
258     default:
259       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
260   }
261 }
262
263 /**
264  * gst_rtsp_stream_new:
265  * @idx: an index
266  * @srcpad: a #GstPad
267  * @payloader: a #GstElement
268  *
269  * Create a new media stream with index @idx that handles RTP data on
270  * @srcpad and has a payloader element @payloader.
271  *
272  * Returns: a new #GstRTSPStream
273  */
274 GstRTSPStream *
275 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
276 {
277   GstRTSPStreamPrivate *priv;
278   GstRTSPStream *stream;
279
280   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
281   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
282   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
283
284   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
285   priv = stream->priv;
286   priv->idx = idx;
287   priv->payloader = gst_object_ref (payloader);
288   priv->srcpad = gst_object_ref (srcpad);
289
290   return stream;
291 }
292
293 /**
294  * gst_rtsp_stream_get_index:
295  * @stream: a #GstRTSPStream
296  *
297  * Get the stream index.
298  *
299  * Return: the stream index.
300  */
301 guint
302 gst_rtsp_stream_get_index (GstRTSPStream * stream)
303 {
304   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
305
306   return stream->priv->idx;
307 }
308
309 /**
310  * gst_rtsp_stream_get_pt:
311  * @stream: a #GstRTSPStream
312  *
313  * Get the stream payload type.
314  *
315  * Return: the stream payload type.
316  */
317 guint
318 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
319 {
320   GstRTSPStreamPrivate *priv;
321   guint pt;
322
323   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
324
325   priv = stream->priv;
326
327   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
328
329   return pt;
330 }
331
332 /**
333  * gst_rtsp_stream_get_srcpad:
334  * @stream: a #GstRTSPStream
335  *
336  * Get the srcpad associated with @stream.
337  *
338  * Returns: (transfer full): the srcpad. Unref after usage.
339  */
340 GstPad *
341 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
342 {
343   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
344
345   return gst_object_ref (stream->priv->srcpad);
346 }
347
348 /**
349  * gst_rtsp_stream_get_control:
350  * @stream: a #GstRTSPStream
351  *
352  * Get the control string to identify this stream.
353  *
354  * Returns: (transfer full): the control string. free after usage.
355  */
356 gchar *
357 gst_rtsp_stream_get_control (GstRTSPStream * stream)
358 {
359   GstRTSPStreamPrivate *priv;
360   gchar *result;
361
362   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
363
364   priv = stream->priv;
365
366   g_mutex_lock (&priv->lock);
367   if ((result = g_strdup (priv->control)) == NULL)
368     result = g_strdup_printf ("stream=%u", priv->idx);
369   g_mutex_unlock (&priv->lock);
370
371   return result;
372 }
373
374 /**
375  * gst_rtsp_stream_set_control:
376  * @stream: a #GstRTSPStream
377  * @control: a control string
378  *
379  * Set the control string in @stream.
380  */
381 void
382 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
383 {
384   GstRTSPStreamPrivate *priv;
385
386   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
387
388   priv = stream->priv;
389
390   g_mutex_lock (&priv->lock);
391   g_free (priv->control);
392   priv->control = g_strdup (control);
393   g_mutex_unlock (&priv->lock);
394 }
395
396 /**
397  * gst_rtsp_stream_has_control:
398  * @stream: a #GstRTSPStream
399  * @control: a control string
400  *
401  * Check if @stream has the control string @control.
402  *
403  * Returns: %TRUE is @stream has @control as the control string
404  */
405 gboolean
406 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
407 {
408   GstRTSPStreamPrivate *priv;
409   gboolean res;
410
411   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
412
413   priv = stream->priv;
414
415   g_mutex_lock (&priv->lock);
416   if (priv->control)
417     res = (g_strcmp0 (priv->control, control) == 0);
418   else {
419     guint streamid;
420     sscanf (control, "stream=%u", &streamid);
421     res = (streamid == priv->idx);
422   }
423   g_mutex_unlock (&priv->lock);
424
425   return res;
426 }
427
428 /**
429  * gst_rtsp_stream_set_mtu:
430  * @stream: a #GstRTSPStream
431  * @mtu: a new MTU
432  *
433  * Configure the mtu in the payloader of @stream to @mtu.
434  */
435 void
436 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
437 {
438   GstRTSPStreamPrivate *priv;
439
440   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
441
442   priv = stream->priv;
443
444   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
445
446   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
447 }
448
449 /**
450  * gst_rtsp_stream_get_mtu:
451  * @stream: a #GstRTSPStream
452  *
453  * Get the configured MTU in the payloader of @stream.
454  *
455  * Returns: the MTU of the payloader.
456  */
457 guint
458 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
459 {
460   GstRTSPStreamPrivate *priv;
461   guint mtu;
462
463   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
464
465   priv = stream->priv;
466
467   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
468
469   return mtu;
470 }
471
472 /* Update the dscp qos property on the udp sinks */
473 static void
474 update_dscp_qos (GstRTSPStream * stream)
475 {
476   GstRTSPStreamPrivate *priv;
477
478   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
479
480   priv = stream->priv;
481
482   if (priv->udpsink[0]) {
483     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
484         NULL);
485   }
486
487   if (priv->udpsink[1]) {
488     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
489         NULL);
490   }
491 }
492
493 /**
494  * gst_rtsp_stream_set_dscp_qos:
495  * @stream: a #GstRTSPStream
496  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
497  *
498  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
499  */
500 void
501 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
502 {
503   GstRTSPStreamPrivate *priv;
504
505   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
506
507   priv = stream->priv;
508
509   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
510
511   if (dscp_qos < -1 || dscp_qos > 63) {
512     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
513     return;
514   }
515
516   priv->dscp_qos = dscp_qos;
517
518   update_dscp_qos (stream);
519 }
520
521 /**
522  * gst_rtsp_stream_get_dscp_qos:
523  * @stream: a #GstRTSPStream
524  *
525  * Get the configured DSCP QoS in of the outgoing sockets.
526  *
527  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
528  */
529 gint
530 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
531 {
532   GstRTSPStreamPrivate *priv;
533
534   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
535
536   priv = stream->priv;
537
538   return priv->dscp_qos;
539 }
540
541 /**
542  * gst_rtsp_stream_set_protocols:
543  * @stream: a #GstRTSPStream
544  * @protocols: the new flags
545  *
546  * Configure the allowed lower transport for @stream.
547  */
548 void
549 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
550     GstRTSPLowerTrans protocols)
551 {
552   GstRTSPStreamPrivate *priv;
553
554   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
555
556   priv = stream->priv;
557
558   g_mutex_lock (&priv->lock);
559   priv->protocols = protocols;
560   g_mutex_unlock (&priv->lock);
561 }
562
563 /**
564  * gst_rtsp_stream_get_protocols:
565  * @stream: a #GstRTSPStream
566  *
567  * Get the allowed protocols of @stream.
568  *
569  * Returns: a #GstRTSPLowerTrans
570  */
571 GstRTSPLowerTrans
572 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
573 {
574   GstRTSPStreamPrivate *priv;
575   GstRTSPLowerTrans res;
576
577   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
578       GST_RTSP_LOWER_TRANS_UNKNOWN);
579
580   priv = stream->priv;
581
582   g_mutex_lock (&priv->lock);
583   res = priv->protocols;
584   g_mutex_unlock (&priv->lock);
585
586   return res;
587 }
588
589 /**
590  * gst_rtsp_stream_set_address_pool:
591  * @stream: a #GstRTSPStream
592  * @pool: a #GstRTSPAddressPool
593  *
594  * configure @pool to be used as the address pool of @stream.
595  */
596 void
597 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
598     GstRTSPAddressPool * pool)
599 {
600   GstRTSPStreamPrivate *priv;
601   GstRTSPAddressPool *old;
602
603   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
604
605   priv = stream->priv;
606
607   GST_LOG_OBJECT (stream, "set address pool %p", pool);
608
609   g_mutex_lock (&priv->lock);
610   if ((old = priv->pool) != pool)
611     priv->pool = pool ? g_object_ref (pool) : NULL;
612   else
613     old = NULL;
614   g_mutex_unlock (&priv->lock);
615
616   if (old)
617     g_object_unref (old);
618 }
619
620 /**
621  * gst_rtsp_stream_get_address_pool:
622  * @stream: a #GstRTSPStream
623  *
624  * Get the #GstRTSPAddressPool used as the address pool of @stream.
625  *
626  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
627  * usage.
628  */
629 GstRTSPAddressPool *
630 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
631 {
632   GstRTSPStreamPrivate *priv;
633   GstRTSPAddressPool *result;
634
635   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
636
637   priv = stream->priv;
638
639   g_mutex_lock (&priv->lock);
640   if ((result = priv->pool))
641     g_object_ref (result);
642   g_mutex_unlock (&priv->lock);
643
644   return result;
645 }
646
647 /**
648  * gst_rtsp_stream_get_multicast_address:
649  * @stream: a #GstRTSPStream
650  * @family: the #GSocketFamily
651  *
652  * Get the multicast address of @stream for @family.
653  *
654  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
655  * allocated. gst_rtsp_address_free() after usage.
656  */
657 GstRTSPAddress *
658 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
659     GSocketFamily family)
660 {
661   GstRTSPStreamPrivate *priv;
662   GstRTSPAddress *result;
663   GstRTSPAddress **addrp;
664   GstRTSPAddressFlags flags;
665
666   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
667
668   priv = stream->priv;
669
670   if (family == G_SOCKET_FAMILY_IPV6) {
671     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
672     addrp = &priv->addr_v4;
673   } else {
674     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
675     addrp = &priv->addr_v6;
676   }
677
678   g_mutex_lock (&priv->lock);
679   if (*addrp == NULL) {
680     if (priv->pool == NULL)
681       goto no_pool;
682
683     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
684
685     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
686     if (*addrp == NULL)
687       goto no_address;
688   }
689   result = gst_rtsp_address_copy (*addrp);
690   g_mutex_unlock (&priv->lock);
691
692   return result;
693
694   /* ERRORS */
695 no_pool:
696   {
697     GST_ERROR_OBJECT (stream, "no address pool specified");
698     g_mutex_unlock (&priv->lock);
699     return NULL;
700   }
701 no_address:
702   {
703     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
704     g_mutex_unlock (&priv->lock);
705     return NULL;
706   }
707 }
708
709 /**
710  * gst_rtsp_stream_reserve_address:
711  * @stream: a #GstRTSPStream
712  * @address: an address
713  * @port: a port
714  * @n_ports: n_ports
715  * @ttl: a TTL
716  *
717  * Reserve @address and @port as the address and port of @stream.
718  *
719  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
720  * reserved. gst_rtsp_address_free() after usage.
721  */
722 GstRTSPAddress *
723 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
724     const gchar * address, guint port, guint n_ports, guint ttl)
725 {
726   GstRTSPStreamPrivate *priv;
727   GstRTSPAddress *result;
728   GInetAddress *addr;
729   GSocketFamily family;
730   GstRTSPAddress **addrp;
731
732   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
733   g_return_val_if_fail (address != NULL, NULL);
734   g_return_val_if_fail (port > 0, NULL);
735   g_return_val_if_fail (n_ports > 0, NULL);
736   g_return_val_if_fail (ttl > 0, NULL);
737
738   priv = stream->priv;
739
740   addr = g_inet_address_new_from_string (address);
741   if (!addr) {
742     GST_ERROR ("failed to get inet addr from %s", address);
743     family = G_SOCKET_FAMILY_IPV4;
744   } else {
745     family = g_inet_address_get_family (addr);
746     g_object_unref (addr);
747   }
748
749   if (family == G_SOCKET_FAMILY_IPV6)
750     addrp = &priv->addr_v4;
751   else
752     addrp = &priv->addr_v6;
753
754   g_mutex_lock (&priv->lock);
755   if (*addrp == NULL) {
756     GstRTSPAddressPoolResult res;
757
758     if (priv->pool == NULL)
759       goto no_pool;
760
761     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
762         port, n_ports, ttl, addrp);
763     if (res != GST_RTSP_ADDRESS_POOL_OK)
764       goto no_address;
765   } else {
766     if (strcmp ((*addrp)->address, address) ||
767         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
768         (*addrp)->ttl != ttl)
769       goto different_address;
770   }
771   result = gst_rtsp_address_copy (*addrp);
772   g_mutex_unlock (&priv->lock);
773
774   return result;
775
776   /* ERRORS */
777 no_pool:
778   {
779     GST_ERROR_OBJECT (stream, "no address pool specified");
780     g_mutex_unlock (&priv->lock);
781     return NULL;
782   }
783 no_address:
784   {
785     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
786         address);
787     g_mutex_unlock (&priv->lock);
788     return NULL;
789   }
790 different_address:
791   {
792     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
793         " reserved", address);
794     g_mutex_unlock (&priv->lock);
795     return NULL;
796   }
797 }
798
799 static gboolean
800 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
801     GSocketFamily family, GstElement * udpsrc_out[2],
802     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
803     GstRTSPAddress ** server_addr_out)
804 {
805   GstStateChangeReturn ret;
806   GstElement *udpsrc0, *udpsrc1;
807   GstElement *udpsink0, *udpsink1;
808   GSocket *rtp_socket = NULL;
809   GSocket *rtcp_socket;
810   gint tmp_rtp, tmp_rtcp;
811   guint count;
812   gint rtpport, rtcpport;
813   GList *rejected_addresses = NULL;
814   GstRTSPAddress *addr = NULL;
815   GInetAddress *inetaddr = NULL;
816   GSocketAddress *rtp_sockaddr = NULL;
817   GSocketAddress *rtcp_sockaddr = NULL;
818   const gchar *multisink_socket;
819
820   if (family == G_SOCKET_FAMILY_IPV6)
821     multisink_socket = "socket-v6";
822   else
823     multisink_socket = "socket";
824
825   udpsrc0 = NULL;
826   udpsrc1 = NULL;
827   udpsink0 = NULL;
828   udpsink1 = NULL;
829   count = 0;
830
831   /* Start with random port */
832   tmp_rtp = 0;
833
834   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
835       G_SOCKET_PROTOCOL_UDP, NULL);
836   if (!rtcp_socket)
837     goto no_udp_protocol;
838
839   if (*server_addr_out)
840     gst_rtsp_address_free (*server_addr_out);
841
842   /* try to allocate 2 UDP ports, the RTP port should be an even
843    * number and the RTCP port should be the next (uneven) port */
844 again:
845
846   if (rtp_socket == NULL) {
847     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
848         G_SOCKET_PROTOCOL_UDP, NULL);
849     if (!rtp_socket)
850       goto no_udp_protocol;
851   }
852
853   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
854     GstRTSPAddressFlags flags;
855
856     if (addr)
857       rejected_addresses = g_list_prepend (rejected_addresses, addr);
858
859     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
860     if (family == G_SOCKET_FAMILY_IPV6)
861       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
862     else
863       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
864
865     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
866
867     if (addr == NULL)
868       goto no_ports;
869
870     tmp_rtp = addr->port;
871
872     g_clear_object (&inetaddr);
873     inetaddr = g_inet_address_new_from_string (addr->address);
874   } else {
875     if (tmp_rtp != 0) {
876       tmp_rtp += 2;
877       if (++count > 20)
878         goto no_ports;
879     }
880
881     if (inetaddr == NULL)
882       inetaddr = g_inet_address_new_any (family);
883   }
884
885   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
886   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
887     g_object_unref (rtp_sockaddr);
888     goto again;
889   }
890   g_object_unref (rtp_sockaddr);
891
892   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
893   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
894     g_clear_object (&rtp_sockaddr);
895     goto socket_error;
896   }
897
898   tmp_rtp =
899       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
900   g_object_unref (rtp_sockaddr);
901
902   /* check if port is even */
903   if ((tmp_rtp & 1) != 0) {
904     /* port not even, close and allocate another */
905     tmp_rtp++;
906     g_clear_object (&rtp_socket);
907     goto again;
908   }
909
910   /* set port */
911   tmp_rtcp = tmp_rtp + 1;
912
913   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
914   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
915     g_object_unref (rtcp_sockaddr);
916     g_clear_object (&rtp_socket);
917     goto again;
918   }
919   g_object_unref (rtcp_sockaddr);
920
921   g_clear_object (&inetaddr);
922
923   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
924   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
925
926   if (udpsrc0 == NULL || udpsrc1 == NULL)
927     goto no_udp_protocol;
928
929   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
930   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
931
932   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
933   if (ret == GST_STATE_CHANGE_FAILURE)
934     goto element_error;
935   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
936   if (ret == GST_STATE_CHANGE_FAILURE)
937     goto element_error;
938
939   /* all fine, do port check */
940   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
941   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
942
943   /* this should not happen... */
944   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
945     goto port_error;
946
947   if (udpsink_out[0])
948     udpsink0 = udpsink_out[0];
949   else
950     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
951
952   if (!udpsink0)
953     goto no_udp_protocol;
954
955   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
956   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
957
958   if (udpsink_out[1])
959     udpsink1 = udpsink_out[1];
960   else
961     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
962
963   if (!udpsink1)
964     goto no_udp_protocol;
965
966   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
967   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
968   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
969
970   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
971   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
972   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
973   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
974   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
975   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
976   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
977   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
978
979   /* we keep these elements, we will further configure them when the
980    * client told us to really use the UDP ports. */
981   udpsrc_out[0] = udpsrc0;
982   udpsrc_out[1] = udpsrc1;
983   udpsink_out[0] = udpsink0;
984   udpsink_out[1] = udpsink1;
985   server_port_out->min = rtpport;
986   server_port_out->max = rtcpport;
987
988   *server_addr_out = addr;
989   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
990
991   g_object_unref (rtp_socket);
992   g_object_unref (rtcp_socket);
993
994   return TRUE;
995
996   /* ERRORS */
997 no_udp_protocol:
998   {
999     goto cleanup;
1000   }
1001 no_ports:
1002   {
1003     goto cleanup;
1004   }
1005 port_error:
1006   {
1007     goto cleanup;
1008   }
1009 socket_error:
1010   {
1011     goto cleanup;
1012   }
1013 element_error:
1014   {
1015     goto cleanup;
1016   }
1017 cleanup:
1018   {
1019     if (udpsrc0) {
1020       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1021       gst_object_unref (udpsrc0);
1022     }
1023     if (udpsrc1) {
1024       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1025       gst_object_unref (udpsrc1);
1026     }
1027     if (udpsink0) {
1028       gst_element_set_state (udpsink0, GST_STATE_NULL);
1029       gst_object_unref (udpsink0);
1030     }
1031     if (udpsink1) {
1032       gst_element_set_state (udpsink1, GST_STATE_NULL);
1033       gst_object_unref (udpsink1);
1034     }
1035     if (inetaddr)
1036       g_object_unref (inetaddr);
1037     g_list_free_full (rejected_addresses,
1038         (GDestroyNotify) gst_rtsp_address_free);
1039     if (addr)
1040       gst_rtsp_address_free (addr);
1041     if (rtp_socket)
1042       g_object_unref (rtp_socket);
1043     if (rtcp_socket)
1044       g_object_unref (rtcp_socket);
1045     return FALSE;
1046   }
1047 }
1048
1049 /* must be called with lock */
1050 static gboolean
1051 alloc_ports (GstRTSPStream * stream)
1052 {
1053   GstRTSPStreamPrivate *priv = stream->priv;
1054
1055   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1056       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1057       &priv->server_port_v4, &priv->server_addr_v4);
1058
1059   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1060       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1061       &priv->server_port_v6, &priv->server_addr_v6);
1062
1063   return priv->have_ipv4 || priv->have_ipv6;
1064 }
1065
1066 /**
1067  * gst_rtsp_stream_get_server_port:
1068  * @stream: a #GstRTSPStream
1069  * @server_port: (out): result server port
1070  * @family: the port family to get
1071  *
1072  * Fill @server_port with the port pair used by the server. This function can
1073  * only be called when @stream has been joined.
1074  */
1075 void
1076 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1077     GstRTSPRange * server_port, GSocketFamily family)
1078 {
1079   GstRTSPStreamPrivate *priv;
1080
1081   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1082   priv = stream->priv;
1083   g_return_if_fail (priv->is_joined);
1084
1085   g_mutex_lock (&priv->lock);
1086   if (family == G_SOCKET_FAMILY_IPV4) {
1087     if (server_port)
1088       *server_port = priv->server_port_v4;
1089   } else {
1090     if (server_port)
1091       *server_port = priv->server_port_v6;
1092   }
1093   g_mutex_unlock (&priv->lock);
1094 }
1095
1096 /**
1097  * gst_rtsp_stream_get_rtpsession:
1098  * @stream: a #GstRTSPStream
1099  *
1100  * Get the RTP session of this stream.
1101  *
1102  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1103  */
1104 GObject *
1105 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1106 {
1107   GstRTSPStreamPrivate *priv;
1108   GObject *session;
1109
1110   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1111
1112   priv = stream->priv;
1113
1114   g_mutex_lock (&priv->lock);
1115   if ((session = priv->session))
1116     g_object_ref (session);
1117   g_mutex_unlock (&priv->lock);
1118
1119   return session;
1120 }
1121
1122 /**
1123  * gst_rtsp_stream_get_ssrc:
1124  * @stream: a #GstRTSPStream
1125  * @ssrc: (out): result ssrc
1126  *
1127  * Get the SSRC used by the RTP session of this stream. This function can only
1128  * be called when @stream has been joined.
1129  */
1130 void
1131 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1132 {
1133   GstRTSPStreamPrivate *priv;
1134
1135   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1136   priv = stream->priv;
1137   g_return_if_fail (priv->is_joined);
1138
1139   g_mutex_lock (&priv->lock);
1140   if (ssrc && priv->session)
1141     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1142   g_mutex_unlock (&priv->lock);
1143 }
1144
1145 /* executed from streaming thread */
1146 static void
1147 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1148 {
1149   GstRTSPStreamPrivate *priv = stream->priv;
1150   GstCaps *newcaps, *oldcaps;
1151
1152   newcaps = gst_pad_get_current_caps (pad);
1153
1154   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1155       newcaps);
1156
1157   g_mutex_lock (&priv->lock);
1158   oldcaps = priv->caps;
1159   priv->caps = newcaps;
1160   g_mutex_unlock (&priv->lock);
1161
1162   if (oldcaps)
1163     gst_caps_unref (oldcaps);
1164 }
1165
1166 static void
1167 dump_structure (const GstStructure * s)
1168 {
1169   gchar *sstr;
1170
1171   sstr = gst_structure_to_string (s);
1172   GST_INFO ("structure: %s", sstr);
1173   g_free (sstr);
1174 }
1175
1176 static GstRTSPStreamTransport *
1177 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1178 {
1179   GstRTSPStreamPrivate *priv = stream->priv;
1180   GList *walk;
1181   GstRTSPStreamTransport *result = NULL;
1182   const gchar *tmp;
1183   gchar *dest;
1184   guint port;
1185
1186   if (rtcp_from == NULL)
1187     return NULL;
1188
1189   tmp = g_strrstr (rtcp_from, ":");
1190   if (tmp == NULL)
1191     return NULL;
1192
1193   port = atoi (tmp + 1);
1194   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1195
1196   g_mutex_lock (&priv->lock);
1197   GST_INFO ("finding %s:%d in %d transports", dest, port,
1198       g_list_length (priv->transports));
1199
1200   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1201     GstRTSPStreamTransport *trans = walk->data;
1202     const GstRTSPTransport *tr;
1203     gint min, max;
1204
1205     tr = gst_rtsp_stream_transport_get_transport (trans);
1206
1207     min = tr->client_port.min;
1208     max = tr->client_port.max;
1209
1210     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1211       result = trans;
1212       break;
1213     }
1214   }
1215   if (result)
1216     g_object_ref (result);
1217   g_mutex_unlock (&priv->lock);
1218
1219   g_free (dest);
1220
1221   return result;
1222 }
1223
1224 static GstRTSPStreamTransport *
1225 check_transport (GObject * source, GstRTSPStream * stream)
1226 {
1227   GstStructure *stats;
1228   GstRTSPStreamTransport *trans;
1229
1230   /* see if we have a stream to match with the origin of the RTCP packet */
1231   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1232   if (trans == NULL) {
1233     g_object_get (source, "stats", &stats, NULL);
1234     if (stats) {
1235       const gchar *rtcp_from;
1236
1237       dump_structure (stats);
1238
1239       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1240       if ((trans = find_transport (stream, rtcp_from))) {
1241         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1242             source);
1243         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1244             g_object_unref);
1245       }
1246       gst_structure_free (stats);
1247     }
1248   }
1249   return trans;
1250 }
1251
1252
1253 static void
1254 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1255 {
1256   GstRTSPStreamTransport *trans;
1257
1258   GST_INFO ("%p: new source %p", stream, source);
1259
1260   trans = check_transport (source, stream);
1261
1262   if (trans)
1263     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1264 }
1265
1266 static void
1267 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1268 {
1269   GST_INFO ("%p: new SDES %p", stream, source);
1270 }
1271
1272 static void
1273 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1274 {
1275   GstRTSPStreamTransport *trans;
1276
1277   trans = check_transport (source, stream);
1278
1279   if (trans) {
1280     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1281     gst_rtsp_stream_transport_keep_alive (trans);
1282   }
1283 #ifdef DUMP_STATS
1284   {
1285     GstStructure *stats;
1286     g_object_get (source, "stats", &stats, NULL);
1287     if (stats) {
1288       dump_structure (stats);
1289       gst_structure_free (stats);
1290     }
1291   }
1292 #endif
1293 }
1294
1295 static void
1296 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1297 {
1298   GST_INFO ("%p: source %p bye", stream, source);
1299 }
1300
1301 static void
1302 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1303 {
1304   GstRTSPStreamTransport *trans;
1305
1306   GST_INFO ("%p: source %p bye timeout", stream, source);
1307
1308   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1309     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1310     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1311   }
1312 }
1313
1314 static void
1315 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1316 {
1317   GstRTSPStreamTransport *trans;
1318
1319   GST_INFO ("%p: source %p timeout", stream, source);
1320
1321   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1322     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1323     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1324   }
1325 }
1326
1327 static GstFlowReturn
1328 handle_new_sample (GstAppSink * sink, gpointer user_data)
1329 {
1330   GstRTSPStreamPrivate *priv;
1331   GList *walk;
1332   GstSample *sample;
1333   GstBuffer *buffer;
1334   GstRTSPStream *stream;
1335
1336   sample = gst_app_sink_pull_sample (sink);
1337   if (!sample)
1338     return GST_FLOW_OK;
1339
1340   stream = (GstRTSPStream *) user_data;
1341   priv = stream->priv;
1342   buffer = gst_sample_get_buffer (sample);
1343
1344   g_mutex_lock (&priv->lock);
1345   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1346     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1347
1348     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1349       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1350     } else {
1351       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1352     }
1353   }
1354   g_mutex_unlock (&priv->lock);
1355
1356   gst_sample_unref (sample);
1357
1358   return GST_FLOW_OK;
1359 }
1360
1361 static GstAppSinkCallbacks sink_cb = {
1362   NULL,                         /* not interested in EOS */
1363   NULL,                         /* not interested in preroll samples */
1364   handle_new_sample,
1365 };
1366
1367 /**
1368  * gst_rtsp_stream_join_bin:
1369  * @stream: a #GstRTSPStream
1370  * @bin: a #GstBin to join
1371  * @rtpbin: a rtpbin element in @bin
1372  * @state: the target state of the new elements
1373  *
1374  * Join the #GstBin @bin that contains the element @rtpbin.
1375  *
1376  * @stream will link to @rtpbin, which must be inside @bin. The elements
1377  * added to @bin will be set to the state given in @state.
1378  *
1379  * Returns: %TRUE on success.
1380  */
1381 gboolean
1382 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1383     GstElement * rtpbin, GstState state)
1384 {
1385   GstRTSPStreamPrivate *priv;
1386   gint i;
1387   guint idx;
1388   gchar *name;
1389   GstPad *pad, *sinkpad, *selpad;
1390   GstPadLinkReturn ret;
1391
1392   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1393   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1394   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1395
1396   priv = stream->priv;
1397
1398   g_mutex_lock (&priv->lock);
1399   if (priv->is_joined)
1400     goto was_joined;
1401
1402   /* create a session with the same index as the stream */
1403   idx = priv->idx;
1404
1405   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1406
1407   if (!alloc_ports (stream))
1408     goto no_ports;
1409
1410   /* update the dscp qos field in the sinks */
1411   update_dscp_qos (stream);
1412
1413   /* get a pad for sending RTP */
1414   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1415   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1416   g_free (name);
1417   /* link the RTP pad to the session manager, it should not really fail unless
1418    * this is not really an RTP pad */
1419   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1420   if (ret != GST_PAD_LINK_OK)
1421     goto link_failed;
1422
1423   /* get pads from the RTP session element for sending and receiving
1424    * RTP/RTCP*/
1425   name = g_strdup_printf ("send_rtp_src_%u", idx);
1426   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1427   g_free (name);
1428   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1429   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1430   g_free (name);
1431   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1432   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1433   g_free (name);
1434   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1435   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1436   g_free (name);
1437
1438   /* get the session */
1439   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1440
1441   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1442       stream);
1443   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1444       stream);
1445   g_signal_connect (priv->session, "on-ssrc-active",
1446       (GCallback) on_ssrc_active, stream);
1447   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1448       stream);
1449   g_signal_connect (priv->session, "on-bye-timeout",
1450       (GCallback) on_bye_timeout, stream);
1451   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1452       stream);
1453
1454   for (i = 0; i < 2; i++) {
1455     GstPad *teepad, *queuepad;
1456     /* For the sender we create this bit of pipeline for both
1457      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1458      * we need to add a queue before appsink to make the pipeline
1459      * not block. For the TCP case, we want to pump data to the
1460      * client as fast as possible anyway.
1461      *
1462      * .--------.      .-----.    .---------.
1463      * | rtpbin |      | tee |    | udpsink |
1464      * |       send->sink   src->sink       |
1465      * '--------'      |     |    '---------'
1466      *                 |     |    .---------.    .---------.
1467      *                 |     |    |  queue  |    | appsink |
1468      *                 |    src->sink      src->sink       |
1469      *                 '-----'    '---------'    '---------'
1470      *
1471      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1472      * udpsink directly to the session.
1473      */
1474     /* add udpsink */
1475     gst_bin_add (bin, priv->udpsink[i]);
1476     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1477
1478     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1479       /* make tee for RTP/RTCP */
1480       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1481       gst_bin_add (bin, priv->tee[i]);
1482
1483       /* and link to rtpbin send pad */
1484       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1485       gst_pad_link (priv->send_src[i], pad);
1486       gst_object_unref (pad);
1487
1488       /* link tee to udpsink */
1489       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1490       gst_pad_link (teepad, sinkpad);
1491       gst_object_unref (teepad);
1492
1493       /* make queue */
1494       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1495       gst_bin_add (bin, priv->appqueue[i]);
1496       /* and link to tee */
1497       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1498       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1499       gst_pad_link (teepad, pad);
1500       gst_object_unref (pad);
1501       gst_object_unref (teepad);
1502
1503       /* make appsink */
1504       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1505       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1506       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1507       gst_bin_add (bin, priv->appsink[i]);
1508       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1509           &sink_cb, stream, NULL);
1510       /* and link to queue */
1511       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1512       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1513       gst_pad_link (queuepad, pad);
1514       gst_object_unref (pad);
1515       gst_object_unref (queuepad);
1516     } else {
1517       /* else only udpsink needed, link it to the session */
1518       gst_pad_link (priv->send_src[i], sinkpad);
1519     }
1520     gst_object_unref (sinkpad);
1521
1522     /* For the receiver we create this bit of pipeline for both
1523      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1524      * and it is all funneled into the rtpbin receive pad.
1525      *
1526      * .--------.     .--------.    .--------.
1527      * | udpsrc |     | funnel |    | rtpbin |
1528      * |       src->sink      src->sink      |
1529      * '--------'     |        |    '--------'
1530      * .--------.     |        |
1531      * | appsrc |     |        |
1532      * |       src->sink       |
1533      * '--------'     '--------'
1534      */
1535     /* make funnel for the RTP/RTCP receivers */
1536     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1537     gst_bin_add (bin, priv->funnel[i]);
1538
1539     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1540     gst_pad_link (pad, priv->recv_sink[i]);
1541     gst_object_unref (pad);
1542
1543     if (priv->udpsrc_v4[i]) {
1544       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1545        * values */
1546       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1547       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1548       /* add udpsrc */
1549       gst_bin_add (bin, priv->udpsrc_v4[i]);
1550
1551       /* and link to the funnel v4 */
1552       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1553       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1554       gst_pad_link (pad, selpad);
1555       gst_object_unref (pad);
1556       gst_object_unref (selpad);
1557     }
1558
1559     if (priv->udpsrc_v6[i]) {
1560       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1561       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1562       gst_bin_add (bin, priv->udpsrc_v6[i]);
1563
1564       /* and link to the funnel v6 */
1565       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1566       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1567       gst_pad_link (pad, selpad);
1568       gst_object_unref (pad);
1569       gst_object_unref (selpad);
1570     }
1571
1572     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1573       /* make and add appsrc */
1574       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1575       gst_bin_add (bin, priv->appsrc[i]);
1576       /* and link to the funnel */
1577       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1578       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1579       gst_pad_link (pad, selpad);
1580       gst_object_unref (pad);
1581       gst_object_unref (selpad);
1582     }
1583
1584     /* check if we need to set to a special state */
1585     if (state != GST_STATE_NULL) {
1586       if (priv->udpsink[i])
1587         gst_element_set_state (priv->udpsink[i], state);
1588       if (priv->appsink[i])
1589         gst_element_set_state (priv->appsink[i], state);
1590       if (priv->appqueue[i])
1591         gst_element_set_state (priv->appqueue[i], state);
1592       if (priv->tee[i])
1593         gst_element_set_state (priv->tee[i], state);
1594       if (priv->funnel[i])
1595         gst_element_set_state (priv->funnel[i], state);
1596       if (priv->appsrc[i])
1597         gst_element_set_state (priv->appsrc[i], state);
1598     }
1599   }
1600
1601   /* be notified of caps changes */
1602   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1603       (GCallback) caps_notify, stream);
1604
1605   priv->is_joined = TRUE;
1606   g_mutex_unlock (&priv->lock);
1607
1608   return TRUE;
1609
1610   /* ERRORS */
1611 was_joined:
1612   {
1613     g_mutex_unlock (&priv->lock);
1614     return TRUE;
1615   }
1616 no_ports:
1617   {
1618     g_mutex_unlock (&priv->lock);
1619     GST_WARNING ("failed to allocate ports %u", idx);
1620     return FALSE;
1621   }
1622 link_failed:
1623   {
1624     GST_WARNING ("failed to link stream %u", idx);
1625     gst_object_unref (priv->send_rtp_sink);
1626     priv->send_rtp_sink = NULL;
1627     g_mutex_unlock (&priv->lock);
1628     return FALSE;
1629   }
1630 }
1631
1632 /**
1633  * gst_rtsp_stream_leave_bin:
1634  * @stream: a #GstRTSPStream
1635  * @bin: a #GstBin
1636  * @rtpbin: a rtpbin #GstElement
1637  *
1638  * Remove the elements of @stream from @bin.
1639  *
1640  * Return: %TRUE on success.
1641  */
1642 gboolean
1643 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1644     GstElement * rtpbin)
1645 {
1646   GstRTSPStreamPrivate *priv;
1647   gint i;
1648
1649   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1650   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1651   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1652
1653   priv = stream->priv;
1654
1655   g_mutex_lock (&priv->lock);
1656   if (!priv->is_joined)
1657     goto was_not_joined;
1658
1659   /* all transports must be removed by now */
1660   g_return_val_if_fail (priv->transports == NULL, FALSE);
1661
1662   GST_INFO ("stream %p leaving bin", stream);
1663
1664   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1665   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1666   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1667   gst_object_unref (priv->send_rtp_sink);
1668   priv->send_rtp_sink = NULL;
1669
1670   for (i = 0; i < 2; i++) {
1671     if (priv->udpsink[i])
1672       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1673     if (priv->appsink[i])
1674       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1675     if (priv->appqueue[i])
1676       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1677     if (priv->tee[i])
1678       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1679     if (priv->funnel[i])
1680       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1681     if (priv->appsrc[i])
1682       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1683     if (priv->udpsrc_v4[i]) {
1684       /* and set udpsrc to NULL now before removing */
1685       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1686       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1687       /* removing them should also nicely release the request
1688        * pads when they finalize */
1689       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1690     }
1691     if (priv->udpsrc_v6[i]) {
1692       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1693       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1694       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1695     }
1696     if (priv->udpsink[i])
1697       gst_bin_remove (bin, priv->udpsink[i]);
1698     if (priv->appsrc[i])
1699       gst_bin_remove (bin, priv->appsrc[i]);
1700     if (priv->appsink[i])
1701       gst_bin_remove (bin, priv->appsink[i]);
1702     if (priv->appqueue[i])
1703       gst_bin_remove (bin, priv->appqueue[i]);
1704     if (priv->tee[i])
1705       gst_bin_remove (bin, priv->tee[i]);
1706     if (priv->funnel[i])
1707       gst_bin_remove (bin, priv->funnel[i]);
1708
1709     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1710     gst_object_unref (priv->recv_sink[i]);
1711     priv->recv_sink[i] = NULL;
1712
1713     priv->udpsrc_v4[i] = NULL;
1714     priv->udpsrc_v6[i] = NULL;
1715     priv->udpsink[i] = NULL;
1716     priv->appsrc[i] = NULL;
1717     priv->appsink[i] = NULL;
1718     priv->appqueue[i] = NULL;
1719     priv->tee[i] = NULL;
1720     priv->funnel[i] = NULL;
1721   }
1722   gst_object_unref (priv->send_src[0]);
1723   priv->send_src[0] = NULL;
1724
1725   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1726   gst_object_unref (priv->send_src[1]);
1727   priv->send_src[1] = NULL;
1728
1729   g_object_unref (priv->session);
1730   priv->session = NULL;
1731   if (priv->caps)
1732     gst_caps_unref (priv->caps);
1733   priv->caps = NULL;
1734
1735   priv->is_joined = FALSE;
1736   g_mutex_unlock (&priv->lock);
1737
1738   return TRUE;
1739
1740 was_not_joined:
1741   {
1742     return TRUE;
1743   }
1744 }
1745
1746 /**
1747  * gst_rtsp_stream_get_rtpinfo:
1748  * @stream: a #GstRTSPStream
1749  * @rtptime: result RTP timestamp
1750  * @seq: result RTP seqnum
1751  *
1752  * Retrieve the current rtptime and seq. This is used to
1753  * construct a RTPInfo reply header.
1754  *
1755  * Returns: %TRUE when rtptime and seq could be determined.
1756  */
1757 gboolean
1758 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1759     guint * rtptime, guint * seq)
1760 {
1761   GstRTSPStreamPrivate *priv;
1762   GObjectClass *payobjclass;
1763
1764   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1765   g_return_val_if_fail (rtptime != NULL, FALSE);
1766   g_return_val_if_fail (seq != NULL, FALSE);
1767
1768   priv = stream->priv;
1769
1770   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1771
1772   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1773       !g_object_class_find_property (payobjclass, "timestamp"))
1774     return FALSE;
1775
1776   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1777
1778   return TRUE;
1779 }
1780
1781 /**
1782  * gst_rtsp_stream_get_caps:
1783  * @stream: a #GstRTSPStream
1784  *
1785  * Retrieve the current caps of @stream.
1786  *
1787  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1788  *    after usage.
1789  */
1790 GstCaps *
1791 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1792 {
1793   GstRTSPStreamPrivate *priv;
1794   GstCaps *result;
1795
1796   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1797
1798   priv = stream->priv;
1799
1800   g_mutex_lock (&priv->lock);
1801   if ((result = priv->caps))
1802     gst_caps_ref (result);
1803   g_mutex_unlock (&priv->lock);
1804
1805   return result;
1806 }
1807
1808 /**
1809  * gst_rtsp_stream_recv_rtp:
1810  * @stream: a #GstRTSPStream
1811  * @buffer: (transfer full): a #GstBuffer
1812  *
1813  * Handle an RTP buffer for the stream. This method is usually called when a
1814  * message has been received from a client using the TCP transport.
1815  *
1816  * This function takes ownership of @buffer.
1817  *
1818  * Returns: a GstFlowReturn.
1819  */
1820 GstFlowReturn
1821 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1822 {
1823   GstRTSPStreamPrivate *priv;
1824   GstFlowReturn ret;
1825   GstElement *element;
1826
1827   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1828   priv = stream->priv;
1829   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1830   g_return_val_if_fail (priv->is_joined, FALSE);
1831
1832   g_mutex_lock (&priv->lock);
1833   if (priv->appsrc[0])
1834     element = gst_object_ref (priv->appsrc[0]);
1835   else
1836     element = NULL;
1837   g_mutex_unlock (&priv->lock);
1838
1839   if (element) {
1840     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1841     gst_object_unref (element);
1842   } else {
1843     ret = GST_FLOW_OK;
1844   }
1845   return ret;
1846 }
1847
1848 /**
1849  * gst_rtsp_stream_recv_rtcp:
1850  * @stream: a #GstRTSPStream
1851  * @buffer: (transfer full): a #GstBuffer
1852  *
1853  * Handle an RTCP buffer for the stream. This method is usually called when a
1854  * message has been received from a client using the TCP transport.
1855  *
1856  * This function takes ownership of @buffer.
1857  *
1858  * Returns: a GstFlowReturn.
1859  */
1860 GstFlowReturn
1861 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1862 {
1863   GstRTSPStreamPrivate *priv;
1864   GstFlowReturn ret;
1865   GstElement *element;
1866
1867   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1868   priv = stream->priv;
1869   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1870   g_return_val_if_fail (priv->is_joined, FALSE);
1871
1872   g_mutex_lock (&priv->lock);
1873   if (priv->appsrc[1])
1874     element = gst_object_ref (priv->appsrc[1]);
1875   else
1876     element = NULL;
1877   g_mutex_unlock (&priv->lock);
1878
1879   if (element) {
1880     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1881     gst_object_unref (element);
1882   } else {
1883     ret = GST_FLOW_OK;
1884   }
1885   return ret;
1886 }
1887
1888 /* must be called with lock */
1889 static gboolean
1890 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1891     gboolean add)
1892 {
1893   GstRTSPStreamPrivate *priv = stream->priv;
1894   const GstRTSPTransport *tr;
1895
1896   tr = gst_rtsp_stream_transport_get_transport (trans);
1897
1898   switch (tr->lower_transport) {
1899     case GST_RTSP_LOWER_TRANS_UDP:
1900     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1901     {
1902       gchar *dest;
1903       gint min, max;
1904       guint ttl = 0;
1905
1906       dest = tr->destination;
1907       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1908         min = tr->port.min;
1909         max = tr->port.max;
1910         ttl = tr->ttl;
1911       } else {
1912         min = tr->client_port.min;
1913         max = tr->client_port.max;
1914       }
1915
1916       if (add) {
1917         GST_INFO ("adding %s:%d-%d", dest, min, max);
1918         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1919         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1920         if (ttl > 0) {
1921           GST_INFO ("setting ttl-mc %d", ttl);
1922           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1923           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1924         }
1925         priv->transports = g_list_prepend (priv->transports, trans);
1926       } else {
1927         GST_INFO ("removing %s:%d-%d", dest, min, max);
1928         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1929         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1930         priv->transports = g_list_remove (priv->transports, trans);
1931       }
1932       break;
1933     }
1934     case GST_RTSP_LOWER_TRANS_TCP:
1935       if (add) {
1936         GST_INFO ("adding TCP %s", tr->destination);
1937         priv->transports = g_list_prepend (priv->transports, trans);
1938       } else {
1939         GST_INFO ("removing TCP %s", tr->destination);
1940         priv->transports = g_list_remove (priv->transports, trans);
1941       }
1942       break;
1943     default:
1944       goto unknown_transport;
1945   }
1946   return TRUE;
1947
1948   /* ERRORS */
1949 unknown_transport:
1950   {
1951     GST_INFO ("Unknown transport %d", tr->lower_transport);
1952     return FALSE;
1953   }
1954 }
1955
1956
1957 /**
1958  * gst_rtsp_stream_add_transport:
1959  * @stream: a #GstRTSPStream
1960  * @trans: a #GstRTSPStreamTransport
1961  *
1962  * Add the transport in @trans to @stream. The media of @stream will
1963  * then also be send to the values configured in @trans.
1964  *
1965  * @stream must be joined to a bin.
1966  *
1967  * @trans must contain a valid #GstRTSPTransport.
1968  *
1969  * Returns: %TRUE if @trans was added
1970  */
1971 gboolean
1972 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1973     GstRTSPStreamTransport * trans)
1974 {
1975   GstRTSPStreamPrivate *priv;
1976   gboolean res;
1977
1978   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1979   priv = stream->priv;
1980   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1981   g_return_val_if_fail (priv->is_joined, FALSE);
1982
1983   g_mutex_lock (&priv->lock);
1984   res = update_transport (stream, trans, TRUE);
1985   g_mutex_unlock (&priv->lock);
1986
1987   return res;
1988 }
1989
1990 /**
1991  * gst_rtsp_stream_remove_transport:
1992  * @stream: a #GstRTSPStream
1993  * @trans: a #GstRTSPStreamTransport
1994  *
1995  * Remove the transport in @trans from @stream. The media of @stream will
1996  * not be sent to the values configured in @trans.
1997  *
1998  * @stream must be joined to a bin.
1999  *
2000  * @trans must contain a valid #GstRTSPTransport.
2001  *
2002  * Returns: %TRUE if @trans was removed
2003  */
2004 gboolean
2005 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2006     GstRTSPStreamTransport * trans)
2007 {
2008   GstRTSPStreamPrivate *priv;
2009   gboolean res;
2010
2011   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2012   priv = stream->priv;
2013   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2014   g_return_val_if_fail (priv->is_joined, FALSE);
2015
2016   g_mutex_lock (&priv->lock);
2017   res = update_transport (stream, trans, FALSE);
2018   g_mutex_unlock (&priv->lock);
2019
2020   return res;
2021 }
2022
2023 /**
2024  * gst_rtsp_stream_get_rtp_socket:
2025  * @stream: a #GstRTSPStream
2026  * @family: the socket family
2027  *
2028  * Get the RTP socket from @stream for a @family.
2029  *
2030  * @stream must be joined to a bin.
2031  *
2032  * Returns: the RTP socket or %NULL if no socket could be allocated for @family.
2033  *     Unref after usage
2034  */
2035 GSocket *
2036 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2037 {
2038   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2039   GSocket *socket;
2040   gchar *name;
2041
2042   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2043   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2044       family == G_SOCKET_FAMILY_IPV6, NULL);
2045   g_return_val_if_fail (priv->udpsink[0], NULL);
2046
2047   if (family == G_SOCKET_FAMILY_IPV6)
2048     name = "socket-v6";
2049   else
2050     name = "socket";
2051
2052   g_object_get (priv->udpsink[0], name, &socket, NULL);
2053
2054   return socket;
2055 }
2056
2057 /**
2058  * gst_rtsp_stream_get_rtcp_socket:
2059  * @stream: a #GstRTSPStream
2060  * @family: the socket family
2061  *
2062  * Get the RTCP socket from @stream for a @family.
2063  *
2064  * @stream must be joined to a bin.
2065  *
2066  * Returns: the RTCP socket or %NULL if no socket could be allocated for
2067  *     @family. Unref after usage
2068  */
2069 GSocket *
2070 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2071 {
2072   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2073   GSocket *socket;
2074   gchar *name;
2075
2076   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2077   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2078       family == G_SOCKET_FAMILY_IPV6, NULL);
2079   g_return_val_if_fail (priv->udpsink[1], NULL);
2080
2081   if (family == G_SOCKET_FAMILY_IPV6)
2082     name = "socket-v6";
2083   else
2084     name = "socket";
2085
2086   g_object_get (priv->udpsink[1], name, &socket, NULL);
2087
2088   return socket;
2089 }
2090
2091 /**
2092  * gst_rtsp_stream_transport_filter:
2093  * @stream: a #GstRTSPStream
2094  * @func: (scope call) (allow-none): a callback
2095  * @user_data: user data passed to @func
2096  *
2097  * Call @func for each transport managed by @stream. The result value of @func
2098  * determines what happens to the transport. @func will be called with @stream
2099  * locked so no further actions on @stream can be performed from @func.
2100  *
2101  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2102  * @stream.
2103  *
2104  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2105  *
2106  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2107  * will also be added with an additional ref to the result #GList of this
2108  * function..
2109  *
2110  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2111  *
2112  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2113  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2114  * element in the #GList should be unreffed before the list is freed.
2115  */
2116 GList *
2117 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2118     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2119 {
2120   GstRTSPStreamPrivate *priv;
2121   GList *result, *walk, *next;
2122
2123   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2124
2125   priv = stream->priv;
2126
2127   result = NULL;
2128
2129   g_mutex_lock (&priv->lock);
2130   for (walk = priv->transports; walk; walk = next) {
2131     GstRTSPStreamTransport *trans = walk->data;
2132     GstRTSPFilterResult res;
2133
2134     next = g_list_next (walk);
2135
2136     if (func)
2137       res = func (stream, trans, user_data);
2138     else
2139       res = GST_RTSP_FILTER_REF;
2140
2141     switch (res) {
2142       case GST_RTSP_FILTER_REMOVE:
2143         update_transport (stream, trans, FALSE);
2144         break;
2145       case GST_RTSP_FILTER_REF:
2146         result = g_list_prepend (result, g_object_ref (trans));
2147         break;
2148       case GST_RTSP_FILTER_KEEP:
2149       default:
2150         break;
2151     }
2152   }
2153   g_mutex_unlock (&priv->lock);
2154
2155   return result;
2156 }