rtsp-server: sprinkle some allow-none annotations for g-i
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPLowerTrans protocols;
72
73   /* pads on the rtpbin */
74   GstPad *send_rtp_sink;
75   GstPad *recv_sink[2];
76   GstPad *send_src[2];
77
78   /* the RTPSession object */
79   GObject *session;
80
81   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
82    * sockets */
83   GstElement *udpsrc_v4[2];
84
85   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
86    * sockets */
87   GstElement *udpsrc_v6[2];
88
89   GstElement *udpsink[2];
90
91   /* for TCP transport */
92   GstElement *appsrc[2];
93   GstElement *appqueue[2];
94   GstElement *appsink[2];
95
96   GstElement *tee[2];
97   GstElement *funnel[2];
98
99   /* server ports for sending/receiving over ipv4 */
100   GstRTSPRange server_port_v4;
101   GstRTSPAddress *server_addr_v4;
102   gboolean have_ipv4;
103
104   /* server ports for sending/receiving over ipv6 */
105   GstRTSPRange server_port_v6;
106   GstRTSPAddress *server_addr_v6;
107   gboolean have_ipv6;
108
109   /* multicast addresses */
110   GstRTSPAddressPool *pool;
111   GstRTSPAddress *addr_v4;
112   GstRTSPAddress *addr_v6;
113
114   /* the caps of the stream */
115   gulong caps_sig;
116   GstCaps *caps;
117
118   /* transports we stream to */
119   guint n_active;
120   GList *transports;
121
122   gint dscp_qos;
123 };
124
125 #define DEFAULT_CONTROL         NULL
126 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
127                                         GST_RTSP_LOWER_TRANS_TCP
128
129 enum
130 {
131   PROP_0,
132   PROP_CONTROL,
133   PROP_PROTOCOLS,
134   PROP_LAST
135 };
136
137 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
138 #define GST_CAT_DEFAULT rtsp_stream_debug
139
140 static GQuark ssrc_stream_map_key;
141
142 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
143     GValue * value, GParamSpec * pspec);
144 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
145     const GValue * value, GParamSpec * pspec);
146
147 static void gst_rtsp_stream_finalize (GObject * obj);
148
149 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
150
151 static void
152 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
153 {
154   GObjectClass *gobject_class;
155
156   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
157
158   gobject_class = G_OBJECT_CLASS (klass);
159
160   gobject_class->get_property = gst_rtsp_stream_get_property;
161   gobject_class->set_property = gst_rtsp_stream_set_property;
162   gobject_class->finalize = gst_rtsp_stream_finalize;
163
164   g_object_class_install_property (gobject_class, PROP_CONTROL,
165       g_param_spec_string ("control", "Control",
166           "The control string for this stream", DEFAULT_CONTROL,
167           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
168
169   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
170       g_param_spec_flags ("protocols", "Protocols",
171           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
172           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
173
174   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
175
176   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
177 }
178
179 static void
180 gst_rtsp_stream_init (GstRTSPStream * stream)
181 {
182   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
183
184   GST_DEBUG ("new stream %p", stream);
185
186   stream->priv = priv;
187
188   priv->dscp_qos = -1;
189   priv->control = g_strdup (DEFAULT_CONTROL);
190   priv->protocols = DEFAULT_PROTOCOLS;
191
192   g_mutex_init (&priv->lock);
193 }
194
195 static void
196 gst_rtsp_stream_finalize (GObject * obj)
197 {
198   GstRTSPStream *stream;
199   GstRTSPStreamPrivate *priv;
200
201   stream = GST_RTSP_STREAM (obj);
202   priv = stream->priv;
203
204   GST_DEBUG ("finalize stream %p", stream);
205
206   /* we really need to be unjoined now */
207   g_return_if_fail (!priv->is_joined);
208
209   if (priv->addr_v4)
210     gst_rtsp_address_free (priv->addr_v4);
211   if (priv->addr_v6)
212     gst_rtsp_address_free (priv->addr_v6);
213   if (priv->server_addr_v4)
214     gst_rtsp_address_free (priv->server_addr_v4);
215   if (priv->server_addr_v6)
216     gst_rtsp_address_free (priv->server_addr_v6);
217   if (priv->pool)
218     g_object_unref (priv->pool);
219   gst_object_unref (priv->payloader);
220   gst_object_unref (priv->srcpad);
221   g_free (priv->control);
222   g_mutex_clear (&priv->lock);
223
224   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
225 }
226
227 static void
228 gst_rtsp_stream_get_property (GObject * object, guint propid,
229     GValue * value, GParamSpec * pspec)
230 {
231   GstRTSPStream *stream = GST_RTSP_STREAM (object);
232
233   switch (propid) {
234     case PROP_CONTROL:
235       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
236       break;
237     case PROP_PROTOCOLS:
238       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
239       break;
240     default:
241       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
242   }
243 }
244
245 static void
246 gst_rtsp_stream_set_property (GObject * object, guint propid,
247     const GValue * value, GParamSpec * pspec)
248 {
249   GstRTSPStream *stream = GST_RTSP_STREAM (object);
250
251   switch (propid) {
252     case PROP_CONTROL:
253       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
254       break;
255     case PROP_PROTOCOLS:
256       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
257       break;
258     default:
259       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
260   }
261 }
262
263 /**
264  * gst_rtsp_stream_new:
265  * @idx: an index
266  * @srcpad: a #GstPad
267  * @payloader: a #GstElement
268  *
269  * Create a new media stream with index @idx that handles RTP data on
270  * @srcpad and has a payloader element @payloader.
271  *
272  * Returns: a new #GstRTSPStream
273  */
274 GstRTSPStream *
275 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
276 {
277   GstRTSPStreamPrivate *priv;
278   GstRTSPStream *stream;
279
280   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
281   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
282   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
283
284   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
285   priv = stream->priv;
286   priv->idx = idx;
287   priv->payloader = gst_object_ref (payloader);
288   priv->srcpad = gst_object_ref (srcpad);
289
290   return stream;
291 }
292
293 /**
294  * gst_rtsp_stream_get_index:
295  * @stream: a #GstRTSPStream
296  *
297  * Get the stream index.
298  *
299  * Return: the stream index.
300  */
301 guint
302 gst_rtsp_stream_get_index (GstRTSPStream * stream)
303 {
304   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
305
306   return stream->priv->idx;
307 }
308
309 /**
310  * gst_rtsp_stream_get_srcpad:
311  * @stream: a #GstRTSPStream
312  *
313  * Get the srcpad associated with @stream.
314  *
315  * Returns: (transfer full): the srcpad. Unref after usage.
316  */
317 GstPad *
318 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
319 {
320   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
321
322   return gst_object_ref (stream->priv->srcpad);
323 }
324
325 /**
326  * gst_rtsp_stream_get_control:
327  * @stream: a #GstRTSPStream
328  *
329  * Get the control string to identify this stream.
330  *
331  * Returns: (transfer full): the control string. free after usage.
332  */
333 gchar *
334 gst_rtsp_stream_get_control (GstRTSPStream * stream)
335 {
336   GstRTSPStreamPrivate *priv;
337   gchar *result;
338
339   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
340
341   priv = stream->priv;
342
343   g_mutex_lock (&priv->lock);
344   if ((result = g_strdup (priv->control)) == NULL)
345     result = g_strdup_printf ("stream=%u", priv->idx);
346   g_mutex_unlock (&priv->lock);
347
348   return result;
349 }
350
351 /**
352  * gst_rtsp_stream_set_control:
353  * @stream: a #GstRTSPStream
354  * @control: a control string
355  *
356  * Set the control string in @stream.
357  */
358 void
359 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
360 {
361   GstRTSPStreamPrivate *priv;
362
363   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
364
365   priv = stream->priv;
366
367   g_mutex_lock (&priv->lock);
368   g_free (priv->control);
369   priv->control = g_strdup (control);
370   g_mutex_unlock (&priv->lock);
371 }
372
373 /**
374  * gst_rtsp_stream_has_control:
375  * @stream: a #GstRTSPStream
376  * @control: a control string
377  *
378  * Check if @stream has the control string @control.
379  *
380  * Returns: %TRUE is @stream has @control as the control string
381  */
382 gboolean
383 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
384 {
385   GstRTSPStreamPrivate *priv;
386   gboolean res;
387
388   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
389
390   priv = stream->priv;
391
392   g_mutex_lock (&priv->lock);
393   if (priv->control)
394     res = (g_strcmp0 (priv->control, control) == 0);
395   else {
396     guint streamid;
397     sscanf (control, "stream=%u", &streamid);
398     res = (streamid == priv->idx);
399   }
400   g_mutex_unlock (&priv->lock);
401
402   return res;
403 }
404
405 /**
406  * gst_rtsp_stream_set_mtu:
407  * @stream: a #GstRTSPStream
408  * @mtu: a new MTU
409  *
410  * Configure the mtu in the payloader of @stream to @mtu.
411  */
412 void
413 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
414 {
415   GstRTSPStreamPrivate *priv;
416
417   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
418
419   priv = stream->priv;
420
421   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
422
423   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
424 }
425
426 /**
427  * gst_rtsp_stream_get_mtu:
428  * @stream: a #GstRTSPStream
429  *
430  * Get the configured MTU in the payloader of @stream.
431  *
432  * Returns: the MTU of the payloader.
433  */
434 guint
435 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
436 {
437   GstRTSPStreamPrivate *priv;
438   guint mtu;
439
440   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
441
442   priv = stream->priv;
443
444   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
445
446   return mtu;
447 }
448
449 /* Update the dscp qos property on the udp sinks */
450 static void
451 update_dscp_qos (GstRTSPStream * stream)
452 {
453   GstRTSPStreamPrivate *priv;
454
455   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
456
457   priv = stream->priv;
458
459   if (priv->udpsink[0]) {
460     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
461         NULL);
462   }
463
464   if (priv->udpsink[1]) {
465     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
466         NULL);
467   }
468 }
469
470 /**
471  * gst_rtsp_stream_set_dscp_qos:
472  * @stream: a #GstRTSPStream
473  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
474  *
475  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
476  */
477 void
478 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
479 {
480   GstRTSPStreamPrivate *priv;
481
482   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
483
484   priv = stream->priv;
485
486   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
487
488   if (dscp_qos < -1 || dscp_qos > 63) {
489     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
490     return;
491   }
492
493   priv->dscp_qos = dscp_qos;
494
495   update_dscp_qos (stream);
496 }
497
498 /**
499  * gst_rtsp_stream_get_dscp_qos:
500  * @stream: a #GstRTSPStream
501  *
502  * Get the configured DSCP QoS in of the outgoing sockets.
503  *
504  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
505  */
506 gint
507 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
508 {
509   GstRTSPStreamPrivate *priv;
510
511   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
512
513   priv = stream->priv;
514
515   return priv->dscp_qos;
516 }
517
518 /**
519  * gst_rtsp_stream_set_protocols:
520  * @stream: a #GstRTSPStream
521  * @protocols: the new flags
522  *
523  * Configure the allowed lower transport for @stream.
524  */
525 void
526 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
527     GstRTSPLowerTrans protocols)
528 {
529   GstRTSPStreamPrivate *priv;
530
531   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
532
533   priv = stream->priv;
534
535   g_mutex_lock (&priv->lock);
536   priv->protocols = protocols;
537   g_mutex_unlock (&priv->lock);
538 }
539
540 /**
541  * gst_rtsp_stream_get_protocols:
542  * @stream: a #GstRTSPStream
543  *
544  * Get the allowed protocols of @stream.
545  *
546  * Returns: a #GstRTSPLowerTrans
547  */
548 GstRTSPLowerTrans
549 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
550 {
551   GstRTSPStreamPrivate *priv;
552   GstRTSPLowerTrans res;
553
554   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
555       GST_RTSP_LOWER_TRANS_UNKNOWN);
556
557   priv = stream->priv;
558
559   g_mutex_lock (&priv->lock);
560   res = priv->protocols;
561   g_mutex_unlock (&priv->lock);
562
563   return res;
564 }
565
566 /**
567  * gst_rtsp_stream_set_address_pool:
568  * @stream: a #GstRTSPStream
569  * @pool: a #GstRTSPAddressPool
570  *
571  * configure @pool to be used as the address pool of @stream.
572  */
573 void
574 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
575     GstRTSPAddressPool * pool)
576 {
577   GstRTSPStreamPrivate *priv;
578   GstRTSPAddressPool *old;
579
580   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
581
582   priv = stream->priv;
583
584   GST_LOG_OBJECT (stream, "set address pool %p", pool);
585
586   g_mutex_lock (&priv->lock);
587   if ((old = priv->pool) != pool)
588     priv->pool = pool ? g_object_ref (pool) : NULL;
589   else
590     old = NULL;
591   g_mutex_unlock (&priv->lock);
592
593   if (old)
594     g_object_unref (old);
595 }
596
597 /**
598  * gst_rtsp_stream_get_address_pool:
599  * @stream: a #GstRTSPStream
600  *
601  * Get the #GstRTSPAddressPool used as the address pool of @stream.
602  *
603  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
604  * usage.
605  */
606 GstRTSPAddressPool *
607 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
608 {
609   GstRTSPStreamPrivate *priv;
610   GstRTSPAddressPool *result;
611
612   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
613
614   priv = stream->priv;
615
616   g_mutex_lock (&priv->lock);
617   if ((result = priv->pool))
618     g_object_ref (result);
619   g_mutex_unlock (&priv->lock);
620
621   return result;
622 }
623
624 /**
625  * gst_rtsp_stream_get_multicast_address:
626  * @stream: a #GstRTSPStream
627  * @family: the #GSocketFamily
628  *
629  * Get the multicast address of @stream for @family.
630  *
631  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
632  * allocated. gst_rtsp_address_free() after usage.
633  */
634 GstRTSPAddress *
635 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
636     GSocketFamily family)
637 {
638   GstRTSPStreamPrivate *priv;
639   GstRTSPAddress *result;
640   GstRTSPAddress **addrp;
641   GstRTSPAddressFlags flags;
642
643   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
644
645   priv = stream->priv;
646
647   if (family == G_SOCKET_FAMILY_IPV6) {
648     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
649     addrp = &priv->addr_v4;
650   } else {
651     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
652     addrp = &priv->addr_v6;
653   }
654
655   g_mutex_lock (&priv->lock);
656   if (*addrp == NULL) {
657     if (priv->pool == NULL)
658       goto no_pool;
659
660     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
661
662     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
663     if (*addrp == NULL)
664       goto no_address;
665   }
666   result = gst_rtsp_address_copy (*addrp);
667   g_mutex_unlock (&priv->lock);
668
669   return result;
670
671   /* ERRORS */
672 no_pool:
673   {
674     GST_ERROR_OBJECT (stream, "no address pool specified");
675     g_mutex_unlock (&priv->lock);
676     return NULL;
677   }
678 no_address:
679   {
680     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
681     g_mutex_unlock (&priv->lock);
682     return NULL;
683   }
684 }
685
686 /**
687  * gst_rtsp_stream_reserve_address:
688  * @stream: a #GstRTSPStream
689  * @address: an address
690  * @port: a port
691  * @n_ports: n_ports
692  * @ttl: a TTL
693  *
694  * Reserve @address and @port as the address and port of @stream.
695  *
696  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
697  * reserved. gst_rtsp_address_free() after usage.
698  */
699 GstRTSPAddress *
700 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
701     const gchar * address, guint port, guint n_ports, guint ttl)
702 {
703   GstRTSPStreamPrivate *priv;
704   GstRTSPAddress *result;
705   GInetAddress *addr;
706   GSocketFamily family;
707   GstRTSPAddress **addrp;
708
709   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
710   g_return_val_if_fail (address != NULL, NULL);
711   g_return_val_if_fail (port > 0, NULL);
712   g_return_val_if_fail (n_ports > 0, NULL);
713   g_return_val_if_fail (ttl > 0, NULL);
714
715   priv = stream->priv;
716
717   addr = g_inet_address_new_from_string (address);
718   if (!addr) {
719     GST_ERROR ("failed to get inet addr from %s", address);
720     family = G_SOCKET_FAMILY_IPV4;
721   } else {
722     family = g_inet_address_get_family (addr);
723     g_object_unref (addr);
724   }
725
726   if (family == G_SOCKET_FAMILY_IPV6)
727     addrp = &priv->addr_v4;
728   else
729     addrp = &priv->addr_v6;
730
731   g_mutex_lock (&priv->lock);
732   if (*addrp == NULL) {
733     GstRTSPAddressPoolResult res;
734
735     if (priv->pool == NULL)
736       goto no_pool;
737
738     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
739         port, n_ports, ttl, addrp);
740     if (res != GST_RTSP_ADDRESS_POOL_OK)
741       goto no_address;
742   } else {
743     if (strcmp ((*addrp)->address, address) ||
744         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
745         (*addrp)->ttl != ttl)
746       goto different_address;
747   }
748   result = gst_rtsp_address_copy (*addrp);
749   g_mutex_unlock (&priv->lock);
750
751   return result;
752
753   /* ERRORS */
754 no_pool:
755   {
756     GST_ERROR_OBJECT (stream, "no address pool specified");
757     g_mutex_unlock (&priv->lock);
758     return NULL;
759   }
760 no_address:
761   {
762     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
763         address);
764     g_mutex_unlock (&priv->lock);
765     return NULL;
766   }
767 different_address:
768   {
769     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
770         " reserved", address);
771     g_mutex_unlock (&priv->lock);
772     return NULL;
773   }
774 }
775
776 static gboolean
777 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
778     GSocketFamily family, GstElement * udpsrc_out[2],
779     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
780     GstRTSPAddress ** server_addr_out)
781 {
782   GstStateChangeReturn ret;
783   GstElement *udpsrc0, *udpsrc1;
784   GstElement *udpsink0, *udpsink1;
785   GSocket *rtp_socket = NULL;
786   GSocket *rtcp_socket;
787   gint tmp_rtp, tmp_rtcp;
788   guint count;
789   gint rtpport, rtcpport;
790   GList *rejected_addresses = NULL;
791   GstRTSPAddress *addr = NULL;
792   GInetAddress *inetaddr = NULL;
793   GSocketAddress *rtp_sockaddr = NULL;
794   GSocketAddress *rtcp_sockaddr = NULL;
795   const gchar *multisink_socket;
796
797   if (family == G_SOCKET_FAMILY_IPV6)
798     multisink_socket = "socket-v6";
799   else
800     multisink_socket = "socket";
801
802   udpsrc0 = NULL;
803   udpsrc1 = NULL;
804   udpsink0 = NULL;
805   udpsink1 = NULL;
806   count = 0;
807
808   /* Start with random port */
809   tmp_rtp = 0;
810
811   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
812       G_SOCKET_PROTOCOL_UDP, NULL);
813   if (!rtcp_socket)
814     goto no_udp_protocol;
815
816   if (*server_addr_out)
817     gst_rtsp_address_free (*server_addr_out);
818
819   /* try to allocate 2 UDP ports, the RTP port should be an even
820    * number and the RTCP port should be the next (uneven) port */
821 again:
822
823   if (rtp_socket == NULL) {
824     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
825         G_SOCKET_PROTOCOL_UDP, NULL);
826     if (!rtp_socket)
827       goto no_udp_protocol;
828   }
829
830   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
831     GstRTSPAddressFlags flags;
832
833     if (addr)
834       rejected_addresses = g_list_prepend (rejected_addresses, addr);
835
836     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
837     if (family == G_SOCKET_FAMILY_IPV6)
838       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
839     else
840       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
841
842     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
843
844     if (addr == NULL)
845       goto no_ports;
846
847     tmp_rtp = addr->port;
848
849     g_clear_object (&inetaddr);
850     inetaddr = g_inet_address_new_from_string (addr->address);
851   } else {
852     if (tmp_rtp != 0) {
853       tmp_rtp += 2;
854       if (++count > 20)
855         goto no_ports;
856     }
857
858     if (inetaddr == NULL)
859       inetaddr = g_inet_address_new_any (family);
860   }
861
862   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
863   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
864     g_object_unref (rtp_sockaddr);
865     goto again;
866   }
867   g_object_unref (rtp_sockaddr);
868
869   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
870   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
871     g_clear_object (&rtp_sockaddr);
872     goto socket_error;
873   }
874
875   tmp_rtp =
876       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
877   g_object_unref (rtp_sockaddr);
878
879   /* check if port is even */
880   if ((tmp_rtp & 1) != 0) {
881     /* port not even, close and allocate another */
882     tmp_rtp++;
883     g_clear_object (&rtp_socket);
884     goto again;
885   }
886
887   /* set port */
888   tmp_rtcp = tmp_rtp + 1;
889
890   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
891   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
892     g_object_unref (rtcp_sockaddr);
893     g_clear_object (&rtp_socket);
894     goto again;
895   }
896   g_object_unref (rtcp_sockaddr);
897
898   g_clear_object (&inetaddr);
899
900   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
901   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
902
903   if (udpsrc0 == NULL || udpsrc1 == NULL)
904     goto no_udp_protocol;
905
906   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
907   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
908
909   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
910   if (ret == GST_STATE_CHANGE_FAILURE)
911     goto element_error;
912   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
913   if (ret == GST_STATE_CHANGE_FAILURE)
914     goto element_error;
915
916   /* all fine, do port check */
917   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
918   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
919
920   /* this should not happen... */
921   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
922     goto port_error;
923
924   if (udpsink_out[0])
925     udpsink0 = udpsink_out[0];
926   else
927     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
928
929   if (!udpsink0)
930     goto no_udp_protocol;
931
932   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
933   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
934
935   if (udpsink_out[1])
936     udpsink1 = udpsink_out[1];
937   else
938     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
939
940   if (!udpsink1)
941     goto no_udp_protocol;
942
943   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
944   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
945   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
946
947   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
948   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
949   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
950   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
951   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
952   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
953   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
954   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
955
956   /* we keep these elements, we will further configure them when the
957    * client told us to really use the UDP ports. */
958   udpsrc_out[0] = udpsrc0;
959   udpsrc_out[1] = udpsrc1;
960   udpsink_out[0] = udpsink0;
961   udpsink_out[1] = udpsink1;
962   server_port_out->min = rtpport;
963   server_port_out->max = rtcpport;
964
965   *server_addr_out = addr;
966   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
967
968   g_object_unref (rtp_socket);
969   g_object_unref (rtcp_socket);
970
971   return TRUE;
972
973   /* ERRORS */
974 no_udp_protocol:
975   {
976     goto cleanup;
977   }
978 no_ports:
979   {
980     goto cleanup;
981   }
982 port_error:
983   {
984     goto cleanup;
985   }
986 socket_error:
987   {
988     goto cleanup;
989   }
990 element_error:
991   {
992     goto cleanup;
993   }
994 cleanup:
995   {
996     if (udpsrc0) {
997       gst_element_set_state (udpsrc0, GST_STATE_NULL);
998       gst_object_unref (udpsrc0);
999     }
1000     if (udpsrc1) {
1001       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1002       gst_object_unref (udpsrc1);
1003     }
1004     if (udpsink0) {
1005       gst_element_set_state (udpsink0, GST_STATE_NULL);
1006       gst_object_unref (udpsink0);
1007     }
1008     if (udpsink1) {
1009       gst_element_set_state (udpsink1, GST_STATE_NULL);
1010       gst_object_unref (udpsink1);
1011     }
1012     if (inetaddr)
1013       g_object_unref (inetaddr);
1014     g_list_free_full (rejected_addresses,
1015         (GDestroyNotify) gst_rtsp_address_free);
1016     if (addr)
1017       gst_rtsp_address_free (addr);
1018     if (rtp_socket)
1019       g_object_unref (rtp_socket);
1020     if (rtcp_socket)
1021       g_object_unref (rtcp_socket);
1022     return FALSE;
1023   }
1024 }
1025
1026 /* must be called with lock */
1027 static gboolean
1028 alloc_ports (GstRTSPStream * stream)
1029 {
1030   GstRTSPStreamPrivate *priv = stream->priv;
1031
1032   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1033       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1034       &priv->server_port_v4, &priv->server_addr_v4);
1035
1036   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1037       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1038       &priv->server_port_v6, &priv->server_addr_v6);
1039
1040   return priv->have_ipv4 || priv->have_ipv6;
1041 }
1042
1043 /**
1044  * gst_rtsp_stream_get_server_port:
1045  * @stream: a #GstRTSPStream
1046  * @server_port: (out): result server port
1047  * @family: the port family to get
1048  *
1049  * Fill @server_port with the port pair used by the server. This function can
1050  * only be called when @stream has been joined.
1051  */
1052 void
1053 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1054     GstRTSPRange * server_port, GSocketFamily family)
1055 {
1056   GstRTSPStreamPrivate *priv;
1057
1058   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1059   priv = stream->priv;
1060   g_return_if_fail (priv->is_joined);
1061
1062   g_mutex_lock (&priv->lock);
1063   if (family == G_SOCKET_FAMILY_IPV4) {
1064     if (server_port)
1065       *server_port = priv->server_port_v4;
1066   } else {
1067     if (server_port)
1068       *server_port = priv->server_port_v6;
1069   }
1070   g_mutex_unlock (&priv->lock);
1071 }
1072
1073 /**
1074  * gst_rtsp_stream_get_rtpsession:
1075  * @stream: a #GstRTSPStream
1076  *
1077  * Get the RTP session of this stream.
1078  *
1079  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1080  */
1081 GObject *
1082 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1083 {
1084   GstRTSPStreamPrivate *priv;
1085   GObject *session;
1086
1087   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1088
1089   priv = stream->priv;
1090
1091   g_mutex_lock (&priv->lock);
1092   if ((session = priv->session))
1093     g_object_ref (session);
1094   g_mutex_unlock (&priv->lock);
1095
1096   return session;
1097 }
1098
1099 /**
1100  * gst_rtsp_stream_get_ssrc:
1101  * @stream: a #GstRTSPStream
1102  * @ssrc: (out): result ssrc
1103  *
1104  * Get the SSRC used by the RTP session of this stream. This function can only
1105  * be called when @stream has been joined.
1106  */
1107 void
1108 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1109 {
1110   GstRTSPStreamPrivate *priv;
1111
1112   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1113   priv = stream->priv;
1114   g_return_if_fail (priv->is_joined);
1115
1116   g_mutex_lock (&priv->lock);
1117   if (ssrc && priv->session)
1118     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1119   g_mutex_unlock (&priv->lock);
1120 }
1121
1122 /* executed from streaming thread */
1123 static void
1124 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1125 {
1126   GstRTSPStreamPrivate *priv = stream->priv;
1127   GstCaps *newcaps, *oldcaps;
1128
1129   newcaps = gst_pad_get_current_caps (pad);
1130
1131   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1132       newcaps);
1133
1134   g_mutex_lock (&priv->lock);
1135   oldcaps = priv->caps;
1136   priv->caps = newcaps;
1137   g_mutex_unlock (&priv->lock);
1138
1139   if (oldcaps)
1140     gst_caps_unref (oldcaps);
1141 }
1142
1143 static void
1144 dump_structure (const GstStructure * s)
1145 {
1146   gchar *sstr;
1147
1148   sstr = gst_structure_to_string (s);
1149   GST_INFO ("structure: %s", sstr);
1150   g_free (sstr);
1151 }
1152
1153 static GstRTSPStreamTransport *
1154 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1155 {
1156   GstRTSPStreamPrivate *priv = stream->priv;
1157   GList *walk;
1158   GstRTSPStreamTransport *result = NULL;
1159   const gchar *tmp;
1160   gchar *dest;
1161   guint port;
1162
1163   if (rtcp_from == NULL)
1164     return NULL;
1165
1166   tmp = g_strrstr (rtcp_from, ":");
1167   if (tmp == NULL)
1168     return NULL;
1169
1170   port = atoi (tmp + 1);
1171   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1172
1173   g_mutex_lock (&priv->lock);
1174   GST_INFO ("finding %s:%d in %d transports", dest, port,
1175       g_list_length (priv->transports));
1176
1177   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1178     GstRTSPStreamTransport *trans = walk->data;
1179     const GstRTSPTransport *tr;
1180     gint min, max;
1181
1182     tr = gst_rtsp_stream_transport_get_transport (trans);
1183
1184     min = tr->client_port.min;
1185     max = tr->client_port.max;
1186
1187     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1188       result = trans;
1189       break;
1190     }
1191   }
1192   if (result)
1193     g_object_ref (result);
1194   g_mutex_unlock (&priv->lock);
1195
1196   g_free (dest);
1197
1198   return result;
1199 }
1200
1201 static GstRTSPStreamTransport *
1202 check_transport (GObject * source, GstRTSPStream * stream)
1203 {
1204   GstStructure *stats;
1205   GstRTSPStreamTransport *trans;
1206
1207   /* see if we have a stream to match with the origin of the RTCP packet */
1208   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1209   if (trans == NULL) {
1210     g_object_get (source, "stats", &stats, NULL);
1211     if (stats) {
1212       const gchar *rtcp_from;
1213
1214       dump_structure (stats);
1215
1216       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1217       if ((trans = find_transport (stream, rtcp_from))) {
1218         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1219             source);
1220         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1221             g_object_unref);
1222       }
1223       gst_structure_free (stats);
1224     }
1225   }
1226   return trans;
1227 }
1228
1229
1230 static void
1231 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1232 {
1233   GstRTSPStreamTransport *trans;
1234
1235   GST_INFO ("%p: new source %p", stream, source);
1236
1237   trans = check_transport (source, stream);
1238
1239   if (trans)
1240     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1241 }
1242
1243 static void
1244 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1245 {
1246   GST_INFO ("%p: new SDES %p", stream, source);
1247 }
1248
1249 static void
1250 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1251 {
1252   GstRTSPStreamTransport *trans;
1253
1254   trans = check_transport (source, stream);
1255
1256   if (trans) {
1257     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1258     gst_rtsp_stream_transport_keep_alive (trans);
1259   }
1260 #ifdef DUMP_STATS
1261   {
1262     GstStructure *stats;
1263     g_object_get (source, "stats", &stats, NULL);
1264     if (stats) {
1265       dump_structure (stats);
1266       gst_structure_free (stats);
1267     }
1268   }
1269 #endif
1270 }
1271
1272 static void
1273 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1274 {
1275   GST_INFO ("%p: source %p bye", stream, source);
1276 }
1277
1278 static void
1279 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1280 {
1281   GstRTSPStreamTransport *trans;
1282
1283   GST_INFO ("%p: source %p bye timeout", stream, source);
1284
1285   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1286     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1287     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1288   }
1289 }
1290
1291 static void
1292 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1293 {
1294   GstRTSPStreamTransport *trans;
1295
1296   GST_INFO ("%p: source %p timeout", stream, source);
1297
1298   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1299     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1300     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1301   }
1302 }
1303
1304 static GstFlowReturn
1305 handle_new_sample (GstAppSink * sink, gpointer user_data)
1306 {
1307   GstRTSPStreamPrivate *priv;
1308   GList *walk;
1309   GstSample *sample;
1310   GstBuffer *buffer;
1311   GstRTSPStream *stream;
1312
1313   sample = gst_app_sink_pull_sample (sink);
1314   if (!sample)
1315     return GST_FLOW_OK;
1316
1317   stream = (GstRTSPStream *) user_data;
1318   priv = stream->priv;
1319   buffer = gst_sample_get_buffer (sample);
1320
1321   g_mutex_lock (&priv->lock);
1322   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1323     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1324
1325     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1326       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1327     } else {
1328       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1329     }
1330   }
1331   g_mutex_unlock (&priv->lock);
1332
1333   gst_sample_unref (sample);
1334
1335   return GST_FLOW_OK;
1336 }
1337
1338 static GstAppSinkCallbacks sink_cb = {
1339   NULL,                         /* not interested in EOS */
1340   NULL,                         /* not interested in preroll samples */
1341   handle_new_sample,
1342 };
1343
1344 /**
1345  * gst_rtsp_stream_join_bin:
1346  * @stream: a #GstRTSPStream
1347  * @bin: a #GstBin to join
1348  * @rtpbin: a rtpbin element in @bin
1349  * @state: the target state of the new elements
1350  *
1351  * Join the #Gstbin @bin that contains the element @rtpbin.
1352  *
1353  * @stream will link to @rtpbin, which must be inside @bin. The elements
1354  * added to @bin will be set to the state given in @state.
1355  *
1356  * Returns: %TRUE on success.
1357  */
1358 gboolean
1359 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1360     GstElement * rtpbin, GstState state)
1361 {
1362   GstRTSPStreamPrivate *priv;
1363   gint i;
1364   guint idx;
1365   gchar *name;
1366   GstPad *pad, *sinkpad, *selpad;
1367   GstPadLinkReturn ret;
1368
1369   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1370   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1371   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1372
1373   priv = stream->priv;
1374
1375   g_mutex_lock (&priv->lock);
1376   if (priv->is_joined)
1377     goto was_joined;
1378
1379   /* create a session with the same index as the stream */
1380   idx = priv->idx;
1381
1382   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1383
1384   if (!alloc_ports (stream))
1385     goto no_ports;
1386
1387   /* update the dscp qos field in the sinks */
1388   update_dscp_qos (stream);
1389
1390   /* get a pad for sending RTP */
1391   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1392   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1393   g_free (name);
1394   /* link the RTP pad to the session manager, it should not really fail unless
1395    * this is not really an RTP pad */
1396   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1397   if (ret != GST_PAD_LINK_OK)
1398     goto link_failed;
1399
1400   /* get pads from the RTP session element for sending and receiving
1401    * RTP/RTCP*/
1402   name = g_strdup_printf ("send_rtp_src_%u", idx);
1403   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1404   g_free (name);
1405   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1406   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1407   g_free (name);
1408   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1409   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1410   g_free (name);
1411   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1412   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1413   g_free (name);
1414
1415   /* get the session */
1416   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1417
1418   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1419       stream);
1420   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1421       stream);
1422   g_signal_connect (priv->session, "on-ssrc-active",
1423       (GCallback) on_ssrc_active, stream);
1424   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1425       stream);
1426   g_signal_connect (priv->session, "on-bye-timeout",
1427       (GCallback) on_bye_timeout, stream);
1428   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1429       stream);
1430
1431   for (i = 0; i < 2; i++) {
1432     GstPad *teepad, *queuepad;
1433     /* For the sender we create this bit of pipeline for both
1434      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1435      * we need to add a queue before appsink to make the pipeline
1436      * not block. For the TCP case, we want to pump data to the
1437      * client as fast as possible anyway.
1438      *
1439      * .--------.      .-----.    .---------.
1440      * | rtpbin |      | tee |    | udpsink |
1441      * |       send->sink   src->sink       |
1442      * '--------'      |     |    '---------'
1443      *                 |     |    .---------.    .---------.
1444      *                 |     |    |  queue  |    | appsink |
1445      *                 |    src->sink      src->sink       |
1446      *                 '-----'    '---------'    '---------'
1447      *
1448      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1449      * udpsink directly to the session.
1450      */
1451     /* add udpsink */
1452     gst_bin_add (bin, priv->udpsink[i]);
1453     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1454
1455     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1456       /* make tee for RTP/RTCP */
1457       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1458       gst_bin_add (bin, priv->tee[i]);
1459
1460       /* and link to rtpbin send pad */
1461       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1462       gst_pad_link (priv->send_src[i], pad);
1463       gst_object_unref (pad);
1464
1465       /* link tee to udpsink */
1466       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1467       gst_pad_link (teepad, sinkpad);
1468       gst_object_unref (teepad);
1469
1470       /* make queue */
1471       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1472       gst_bin_add (bin, priv->appqueue[i]);
1473       /* and link to tee */
1474       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1475       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1476       gst_pad_link (teepad, pad);
1477       gst_object_unref (pad);
1478       gst_object_unref (teepad);
1479
1480       /* make appsink */
1481       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1482       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1483       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1484       gst_bin_add (bin, priv->appsink[i]);
1485       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1486           &sink_cb, stream, NULL);
1487       /* and link to queue */
1488       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1489       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1490       gst_pad_link (queuepad, pad);
1491       gst_object_unref (pad);
1492       gst_object_unref (queuepad);
1493     } else {
1494       /* else only udpsink needed, link it to the session */
1495       gst_pad_link (priv->send_src[i], sinkpad);
1496     }
1497     gst_object_unref (sinkpad);
1498
1499     /* For the receiver we create this bit of pipeline for both
1500      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1501      * and it is all funneled into the rtpbin receive pad.
1502      *
1503      * .--------.     .--------.    .--------.
1504      * | udpsrc |     | funnel |    | rtpbin |
1505      * |       src->sink      src->sink      |
1506      * '--------'     |        |    '--------'
1507      * .--------.     |        |
1508      * | appsrc |     |        |
1509      * |       src->sink       |
1510      * '--------'     '--------'
1511      */
1512     /* make funnel for the RTP/RTCP receivers */
1513     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1514     gst_bin_add (bin, priv->funnel[i]);
1515
1516     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1517     gst_pad_link (pad, priv->recv_sink[i]);
1518     gst_object_unref (pad);
1519
1520     if (priv->udpsrc_v4[i]) {
1521       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1522        * values */
1523       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1524       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1525       /* add udpsrc */
1526       gst_bin_add (bin, priv->udpsrc_v4[i]);
1527
1528       /* and link to the funnel v4 */
1529       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1530       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1531       gst_pad_link (pad, selpad);
1532       gst_object_unref (pad);
1533       gst_object_unref (selpad);
1534     }
1535
1536     if (priv->udpsrc_v6[i]) {
1537       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1538       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1539       gst_bin_add (bin, priv->udpsrc_v6[i]);
1540
1541       /* and link to the funnel v6 */
1542       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1543       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1544       gst_pad_link (pad, selpad);
1545       gst_object_unref (pad);
1546       gst_object_unref (selpad);
1547     }
1548
1549     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1550       /* make and add appsrc */
1551       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1552       gst_bin_add (bin, priv->appsrc[i]);
1553       /* and link to the funnel */
1554       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1555       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1556       gst_pad_link (pad, selpad);
1557       gst_object_unref (pad);
1558       gst_object_unref (selpad);
1559     }
1560
1561     /* check if we need to set to a special state */
1562     if (state != GST_STATE_NULL) {
1563       if (priv->udpsink[i])
1564         gst_element_set_state (priv->udpsink[i], state);
1565       if (priv->appsink[i])
1566         gst_element_set_state (priv->appsink[i], state);
1567       if (priv->appqueue[i])
1568         gst_element_set_state (priv->appqueue[i], state);
1569       if (priv->tee[i])
1570         gst_element_set_state (priv->tee[i], state);
1571       if (priv->funnel[i])
1572         gst_element_set_state (priv->funnel[i], state);
1573       if (priv->appsrc[i])
1574         gst_element_set_state (priv->appsrc[i], state);
1575     }
1576   }
1577
1578   /* be notified of caps changes */
1579   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1580       (GCallback) caps_notify, stream);
1581
1582   priv->is_joined = TRUE;
1583   g_mutex_unlock (&priv->lock);
1584
1585   return TRUE;
1586
1587   /* ERRORS */
1588 was_joined:
1589   {
1590     g_mutex_unlock (&priv->lock);
1591     return TRUE;
1592   }
1593 no_ports:
1594   {
1595     g_mutex_unlock (&priv->lock);
1596     GST_WARNING ("failed to allocate ports %u", idx);
1597     return FALSE;
1598   }
1599 link_failed:
1600   {
1601     GST_WARNING ("failed to link stream %u", idx);
1602     gst_object_unref (priv->send_rtp_sink);
1603     priv->send_rtp_sink = NULL;
1604     g_mutex_unlock (&priv->lock);
1605     return FALSE;
1606   }
1607 }
1608
1609 /**
1610  * gst_rtsp_stream_leave_bin:
1611  * @stream: a #GstRTSPStream
1612  * @bin: a #GstBin
1613  * @rtpbin: a rtpbin #GstElement
1614  *
1615  * Remove the elements of @stream from @bin.
1616  *
1617  * Return: %TRUE on success.
1618  */
1619 gboolean
1620 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1621     GstElement * rtpbin)
1622 {
1623   GstRTSPStreamPrivate *priv;
1624   gint i;
1625
1626   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1627   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1628   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1629
1630   priv = stream->priv;
1631
1632   g_mutex_lock (&priv->lock);
1633   if (!priv->is_joined)
1634     goto was_not_joined;
1635
1636   /* all transports must be removed by now */
1637   g_return_val_if_fail (priv->transports == NULL, FALSE);
1638
1639   GST_INFO ("stream %p leaving bin", stream);
1640
1641   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1642   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1643   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1644   gst_object_unref (priv->send_rtp_sink);
1645   priv->send_rtp_sink = NULL;
1646
1647   for (i = 0; i < 2; i++) {
1648     if (priv->udpsink[i])
1649       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1650     if (priv->appsink[i])
1651       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1652     if (priv->appqueue[i])
1653       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1654     if (priv->tee[i])
1655       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1656     if (priv->funnel[i])
1657       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1658     if (priv->appsrc[i])
1659       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1660     if (priv->udpsrc_v4[i]) {
1661       /* and set udpsrc to NULL now before removing */
1662       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1663       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1664       /* removing them should also nicely release the request
1665        * pads when they finalize */
1666       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1667     }
1668     if (priv->udpsrc_v6[i]) {
1669       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1670       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1671       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1672     }
1673     if (priv->udpsink[i])
1674       gst_bin_remove (bin, priv->udpsink[i]);
1675     if (priv->appsrc[i])
1676       gst_bin_remove (bin, priv->appsrc[i]);
1677     if (priv->appsink[i])
1678       gst_bin_remove (bin, priv->appsink[i]);
1679     if (priv->appqueue[i])
1680       gst_bin_remove (bin, priv->appqueue[i]);
1681     if (priv->tee[i])
1682       gst_bin_remove (bin, priv->tee[i]);
1683     if (priv->funnel[i])
1684       gst_bin_remove (bin, priv->funnel[i]);
1685
1686     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1687     gst_object_unref (priv->recv_sink[i]);
1688     priv->recv_sink[i] = NULL;
1689
1690     priv->udpsrc_v4[i] = NULL;
1691     priv->udpsrc_v6[i] = NULL;
1692     priv->udpsink[i] = NULL;
1693     priv->appsrc[i] = NULL;
1694     priv->appsink[i] = NULL;
1695     priv->appqueue[i] = NULL;
1696     priv->tee[i] = NULL;
1697     priv->funnel[i] = NULL;
1698   }
1699   gst_object_unref (priv->send_src[0]);
1700   priv->send_src[0] = NULL;
1701
1702   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1703   gst_object_unref (priv->send_src[1]);
1704   priv->send_src[1] = NULL;
1705
1706   g_object_unref (priv->session);
1707   priv->session = NULL;
1708   if (priv->caps)
1709     gst_caps_unref (priv->caps);
1710   priv->caps = NULL;
1711
1712   priv->is_joined = FALSE;
1713   g_mutex_unlock (&priv->lock);
1714
1715   return TRUE;
1716
1717 was_not_joined:
1718   {
1719     return TRUE;
1720   }
1721 }
1722
1723 /**
1724  * gst_rtsp_stream_get_rtpinfo:
1725  * @stream: a #GstRTSPStream
1726  * @rtptime: result RTP timestamp
1727  * @seq: result RTP seqnum
1728  *
1729  * Retrieve the current rtptime and seq. This is used to
1730  * construct a RTPInfo reply header.
1731  *
1732  * Returns: %TRUE when rtptime and seq could be determined.
1733  */
1734 gboolean
1735 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1736     guint * rtptime, guint * seq)
1737 {
1738   GstRTSPStreamPrivate *priv;
1739   GObjectClass *payobjclass;
1740
1741   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1742   g_return_val_if_fail (rtptime != NULL, FALSE);
1743   g_return_val_if_fail (seq != NULL, FALSE);
1744
1745   priv = stream->priv;
1746
1747   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1748
1749   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1750       !g_object_class_find_property (payobjclass, "timestamp"))
1751     return FALSE;
1752
1753   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1754
1755   return TRUE;
1756 }
1757
1758 /**
1759  * gst_rtsp_stream_get_caps:
1760  * @stream: a #GstRTSPStream
1761  *
1762  * Retrieve the current caps of @stream.
1763  *
1764  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1765  *    after usage.
1766  */
1767 GstCaps *
1768 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1769 {
1770   GstRTSPStreamPrivate *priv;
1771   GstCaps *result;
1772
1773   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1774
1775   priv = stream->priv;
1776
1777   g_mutex_lock (&priv->lock);
1778   if ((result = priv->caps))
1779     gst_caps_ref (result);
1780   g_mutex_unlock (&priv->lock);
1781
1782   return result;
1783 }
1784
1785 /**
1786  * gst_rtsp_stream_recv_rtp:
1787  * @stream: a #GstRTSPStream
1788  * @buffer: (transfer full): a #GstBuffer
1789  *
1790  * Handle an RTP buffer for the stream. This method is usually called when a
1791  * message has been received from a client using the TCP transport.
1792  *
1793  * This function takes ownership of @buffer.
1794  *
1795  * Returns: a GstFlowReturn.
1796  */
1797 GstFlowReturn
1798 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1799 {
1800   GstRTSPStreamPrivate *priv;
1801   GstFlowReturn ret;
1802   GstElement *element;
1803
1804   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1805   priv = stream->priv;
1806   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1807   g_return_val_if_fail (priv->is_joined, FALSE);
1808
1809   g_mutex_lock (&priv->lock);
1810   if (priv->appsrc[0])
1811     element = gst_object_ref (priv->appsrc[0]);
1812   else
1813     element = NULL;
1814   g_mutex_unlock (&priv->lock);
1815
1816   if (element) {
1817     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1818     gst_object_unref (element);
1819   } else {
1820     ret = GST_FLOW_OK;
1821   }
1822   return ret;
1823 }
1824
1825 /**
1826  * gst_rtsp_stream_recv_rtcp:
1827  * @stream: a #GstRTSPStream
1828  * @buffer: (transfer full): a #GstBuffer
1829  *
1830  * Handle an RTCP buffer for the stream. This method is usually called when a
1831  * message has been received from a client using the TCP transport.
1832  *
1833  * This function takes ownership of @buffer.
1834  *
1835  * Returns: a GstFlowReturn.
1836  */
1837 GstFlowReturn
1838 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1839 {
1840   GstRTSPStreamPrivate *priv;
1841   GstFlowReturn ret;
1842   GstElement *element;
1843
1844   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1845   priv = stream->priv;
1846   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1847   g_return_val_if_fail (priv->is_joined, FALSE);
1848
1849   g_mutex_lock (&priv->lock);
1850   if (priv->appsrc[1])
1851     element = gst_object_ref (priv->appsrc[1]);
1852   else
1853     element = NULL;
1854   g_mutex_unlock (&priv->lock);
1855
1856   if (element) {
1857     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1858     gst_object_unref (element);
1859   } else {
1860     ret = GST_FLOW_OK;
1861   }
1862   return ret;
1863 }
1864
1865 /* must be called with lock */
1866 static gboolean
1867 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1868     gboolean add)
1869 {
1870   GstRTSPStreamPrivate *priv = stream->priv;
1871   const GstRTSPTransport *tr;
1872
1873   tr = gst_rtsp_stream_transport_get_transport (trans);
1874
1875   switch (tr->lower_transport) {
1876     case GST_RTSP_LOWER_TRANS_UDP:
1877     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1878     {
1879       gchar *dest;
1880       gint min, max;
1881       guint ttl = 0;
1882
1883       dest = tr->destination;
1884       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1885         min = tr->port.min;
1886         max = tr->port.max;
1887         ttl = tr->ttl;
1888       } else {
1889         min = tr->client_port.min;
1890         max = tr->client_port.max;
1891       }
1892
1893       if (add) {
1894         GST_INFO ("adding %s:%d-%d", dest, min, max);
1895         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1896         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1897         if (ttl > 0) {
1898           GST_INFO ("setting ttl-mc %d", ttl);
1899           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1900           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1901         }
1902         priv->transports = g_list_prepend (priv->transports, trans);
1903       } else {
1904         GST_INFO ("removing %s:%d-%d", dest, min, max);
1905         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1906         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1907         priv->transports = g_list_remove (priv->transports, trans);
1908       }
1909       break;
1910     }
1911     case GST_RTSP_LOWER_TRANS_TCP:
1912       if (add) {
1913         GST_INFO ("adding TCP %s", tr->destination);
1914         priv->transports = g_list_prepend (priv->transports, trans);
1915       } else {
1916         GST_INFO ("removing TCP %s", tr->destination);
1917         priv->transports = g_list_remove (priv->transports, trans);
1918       }
1919       break;
1920     default:
1921       goto unknown_transport;
1922   }
1923   return TRUE;
1924
1925   /* ERRORS */
1926 unknown_transport:
1927   {
1928     GST_INFO ("Unknown transport %d", tr->lower_transport);
1929     return FALSE;
1930   }
1931 }
1932
1933
1934 /**
1935  * gst_rtsp_stream_add_transport:
1936  * @stream: a #GstRTSPStream
1937  * @trans: a #GstRTSPStreamTransport
1938  *
1939  * Add the transport in @trans to @stream. The media of @stream will
1940  * then also be send to the values configured in @trans.
1941  *
1942  * @stream must be joined to a bin.
1943  *
1944  * @trans must contain a valid #GstRTSPTransport.
1945  *
1946  * Returns: %TRUE if @trans was added
1947  */
1948 gboolean
1949 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1950     GstRTSPStreamTransport * trans)
1951 {
1952   GstRTSPStreamPrivate *priv;
1953   gboolean res;
1954
1955   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1956   priv = stream->priv;
1957   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1958   g_return_val_if_fail (priv->is_joined, FALSE);
1959
1960   g_mutex_lock (&priv->lock);
1961   res = update_transport (stream, trans, TRUE);
1962   g_mutex_unlock (&priv->lock);
1963
1964   return res;
1965 }
1966
1967 /**
1968  * gst_rtsp_stream_remove_transport:
1969  * @stream: a #GstRTSPStream
1970  * @trans: a #GstRTSPStreamTransport
1971  *
1972  * Remove the transport in @trans from @stream. The media of @stream will
1973  * not be sent to the values configured in @trans.
1974  *
1975  * @stream must be joined to a bin.
1976  *
1977  * @trans must contain a valid #GstRTSPTransport.
1978  *
1979  * Returns: %TRUE if @trans was removed
1980  */
1981 gboolean
1982 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1983     GstRTSPStreamTransport * trans)
1984 {
1985   GstRTSPStreamPrivate *priv;
1986   gboolean res;
1987
1988   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1989   priv = stream->priv;
1990   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1991   g_return_val_if_fail (priv->is_joined, FALSE);
1992
1993   g_mutex_lock (&priv->lock);
1994   res = update_transport (stream, trans, FALSE);
1995   g_mutex_unlock (&priv->lock);
1996
1997   return res;
1998 }
1999
2000 /**
2001  * gst_rtsp_stream_get_rtp_socket:
2002  * @stream: a #GstRTSPStream
2003  * @family: the socket family
2004  *
2005  * Get the RTP socket from @stream for a @family.
2006  *
2007  * @stream must be joined to a bin.
2008  *
2009  * Returns: the RTP socket or %NULL if no socket could be allocated for @family.
2010  *     Unref after usage
2011  */
2012 GSocket *
2013 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2014 {
2015   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2016   GSocket *socket;
2017   gchar *name;
2018
2019   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2020   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2021       family == G_SOCKET_FAMILY_IPV6, NULL);
2022   g_return_val_if_fail (priv->udpsink[0], NULL);
2023
2024   if (family == G_SOCKET_FAMILY_IPV6)
2025     name = "socket-v6";
2026   else
2027     name = "socket";
2028
2029   g_object_get (priv->udpsink[0], name, &socket, NULL);
2030
2031   return socket;
2032 }
2033
2034 /**
2035  * gst_rtsp_stream_get_rtcp_socket:
2036  * @stream: a #GstRTSPStream
2037  * @family: the socket family
2038  *
2039  * Get the RTCP socket from @stream for a @family.
2040  *
2041  * @stream must be joined to a bin.
2042  *
2043  * Returns: the RTCP socket or %NULL if no socket could be allocated for
2044  *     @family. Unref after usage
2045  */
2046 GSocket *
2047 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2048 {
2049   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2050   GSocket *socket;
2051   gchar *name;
2052
2053   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2054   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2055       family == G_SOCKET_FAMILY_IPV6, NULL);
2056   g_return_val_if_fail (priv->udpsink[1], NULL);
2057
2058   if (family == G_SOCKET_FAMILY_IPV6)
2059     name = "socket-v6";
2060   else
2061     name = "socket";
2062
2063   g_object_get (priv->udpsink[1], name, &socket, NULL);
2064
2065   return socket;
2066 }
2067
2068 /**
2069  * gst_rtsp_stream_transport_filter:
2070  * @stream: a #GstRTSPStream
2071  * @func: (scope call) (allow-none): a callback
2072  * @user_data: user data passed to @func
2073  *
2074  * Call @func for each transport managed by @stream. The result value of @func
2075  * determines what happens to the transport. @func will be called with @stream
2076  * locked so no further actions on @stream can be performed from @func.
2077  *
2078  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2079  * @stream.
2080  *
2081  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2082  *
2083  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2084  * will also be added with an additional ref to the result #GList of this
2085  * function..
2086  *
2087  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2088  *
2089  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2090  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2091  * element in the #GList should be unreffed before the list is freed.
2092  */
2093 GList *
2094 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2095     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2096 {
2097   GstRTSPStreamPrivate *priv;
2098   GList *result, *walk, *next;
2099
2100   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2101
2102   priv = stream->priv;
2103
2104   result = NULL;
2105
2106   g_mutex_lock (&priv->lock);
2107   for (walk = priv->transports; walk; walk = next) {
2108     GstRTSPStreamTransport *trans = walk->data;
2109     GstRTSPFilterResult res;
2110
2111     next = g_list_next (walk);
2112
2113     if (func)
2114       res = func (stream, trans, user_data);
2115     else
2116       res = GST_RTSP_FILTER_REF;
2117
2118     switch (res) {
2119       case GST_RTSP_FILTER_REMOVE:
2120         update_transport (stream, trans, FALSE);
2121         break;
2122       case GST_RTSP_FILTER_REF:
2123         result = g_list_prepend (result, g_object_ref (trans));
2124         break;
2125       case GST_RTSP_FILTER_KEEP:
2126       default:
2127         break;
2128     }
2129   }
2130   g_mutex_unlock (&priv->lock);
2131
2132   return result;
2133 }