stream: add method to check supported transport
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPLowerTrans protocols;
72
73   /* pads on the rtpbin */
74   GstPad *send_rtp_sink;
75   GstPad *recv_sink[2];
76   GstPad *send_src[2];
77
78   /* the RTPSession object */
79   GObject *session;
80
81   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
82    * sockets */
83   GstElement *udpsrc_v4[2];
84
85   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
86    * sockets */
87   GstElement *udpsrc_v6[2];
88
89   GstElement *udpsink[2];
90
91   /* for TCP transport */
92   GstElement *appsrc[2];
93   GstElement *appqueue[2];
94   GstElement *appsink[2];
95
96   GstElement *tee[2];
97   GstElement *funnel[2];
98
99   /* server ports for sending/receiving over ipv4 */
100   GstRTSPRange server_port_v4;
101   GstRTSPAddress *server_addr_v4;
102   gboolean have_ipv4;
103
104   /* server ports for sending/receiving over ipv6 */
105   GstRTSPRange server_port_v6;
106   GstRTSPAddress *server_addr_v6;
107   gboolean have_ipv6;
108
109   /* multicast addresses */
110   GstRTSPAddressPool *pool;
111   GstRTSPAddress *addr_v4;
112   GstRTSPAddress *addr_v6;
113
114   /* the caps of the stream */
115   gulong caps_sig;
116   GstCaps *caps;
117
118   /* transports we stream to */
119   guint n_active;
120   GList *transports;
121
122   gint dscp_qos;
123
124   /* stream blocking */
125   gulong blocked_id;
126   gboolean blocking;
127 };
128
129 #define DEFAULT_CONTROL         NULL
130 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
131                                         GST_RTSP_LOWER_TRANS_TCP
132
133 enum
134 {
135   PROP_0,
136   PROP_CONTROL,
137   PROP_PROTOCOLS,
138   PROP_LAST
139 };
140
141 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
142 #define GST_CAT_DEFAULT rtsp_stream_debug
143
144 static GQuark ssrc_stream_map_key;
145
146 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
147     GValue * value, GParamSpec * pspec);
148 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
149     const GValue * value, GParamSpec * pspec);
150
151 static void gst_rtsp_stream_finalize (GObject * obj);
152
153 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
154
155 static void
156 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
157 {
158   GObjectClass *gobject_class;
159
160   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
161
162   gobject_class = G_OBJECT_CLASS (klass);
163
164   gobject_class->get_property = gst_rtsp_stream_get_property;
165   gobject_class->set_property = gst_rtsp_stream_set_property;
166   gobject_class->finalize = gst_rtsp_stream_finalize;
167
168   g_object_class_install_property (gobject_class, PROP_CONTROL,
169       g_param_spec_string ("control", "Control",
170           "The control string for this stream", DEFAULT_CONTROL,
171           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172
173   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
174       g_param_spec_flags ("protocols", "Protocols",
175           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
176           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177
178   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
179
180   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
181 }
182
183 static void
184 gst_rtsp_stream_init (GstRTSPStream * stream)
185 {
186   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
187
188   GST_DEBUG ("new stream %p", stream);
189
190   stream->priv = priv;
191
192   priv->dscp_qos = -1;
193   priv->control = g_strdup (DEFAULT_CONTROL);
194   priv->protocols = DEFAULT_PROTOCOLS;
195
196   g_mutex_init (&priv->lock);
197 }
198
199 static void
200 gst_rtsp_stream_finalize (GObject * obj)
201 {
202   GstRTSPStream *stream;
203   GstRTSPStreamPrivate *priv;
204
205   stream = GST_RTSP_STREAM (obj);
206   priv = stream->priv;
207
208   GST_DEBUG ("finalize stream %p", stream);
209
210   /* we really need to be unjoined now */
211   g_return_if_fail (!priv->is_joined);
212
213   if (priv->addr_v4)
214     gst_rtsp_address_free (priv->addr_v4);
215   if (priv->addr_v6)
216     gst_rtsp_address_free (priv->addr_v6);
217   if (priv->server_addr_v4)
218     gst_rtsp_address_free (priv->server_addr_v4);
219   if (priv->server_addr_v6)
220     gst_rtsp_address_free (priv->server_addr_v6);
221   if (priv->pool)
222     g_object_unref (priv->pool);
223   gst_object_unref (priv->payloader);
224   gst_object_unref (priv->srcpad);
225   g_free (priv->control);
226   g_mutex_clear (&priv->lock);
227
228   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
229 }
230
231 static void
232 gst_rtsp_stream_get_property (GObject * object, guint propid,
233     GValue * value, GParamSpec * pspec)
234 {
235   GstRTSPStream *stream = GST_RTSP_STREAM (object);
236
237   switch (propid) {
238     case PROP_CONTROL:
239       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
240       break;
241     case PROP_PROTOCOLS:
242       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
243       break;
244     default:
245       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
246   }
247 }
248
249 static void
250 gst_rtsp_stream_set_property (GObject * object, guint propid,
251     const GValue * value, GParamSpec * pspec)
252 {
253   GstRTSPStream *stream = GST_RTSP_STREAM (object);
254
255   switch (propid) {
256     case PROP_CONTROL:
257       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
258       break;
259     case PROP_PROTOCOLS:
260       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
261       break;
262     default:
263       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
264   }
265 }
266
267 /**
268  * gst_rtsp_stream_new:
269  * @idx: an index
270  * @srcpad: a #GstPad
271  * @payloader: a #GstElement
272  *
273  * Create a new media stream with index @idx that handles RTP data on
274  * @srcpad and has a payloader element @payloader.
275  *
276  * Returns: a new #GstRTSPStream
277  */
278 GstRTSPStream *
279 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
280 {
281   GstRTSPStreamPrivate *priv;
282   GstRTSPStream *stream;
283
284   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
285   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
286   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
287
288   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
289   priv = stream->priv;
290   priv->idx = idx;
291   priv->payloader = gst_object_ref (payloader);
292   priv->srcpad = gst_object_ref (srcpad);
293
294   return stream;
295 }
296
297 /**
298  * gst_rtsp_stream_get_index:
299  * @stream: a #GstRTSPStream
300  *
301  * Get the stream index.
302  *
303  * Return: the stream index.
304  */
305 guint
306 gst_rtsp_stream_get_index (GstRTSPStream * stream)
307 {
308   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
309
310   return stream->priv->idx;
311 }
312
313 /**
314  * gst_rtsp_stream_get_pt:
315  * @stream: a #GstRTSPStream
316  *
317  * Get the stream payload type.
318  *
319  * Return: the stream payload type.
320  */
321 guint
322 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
323 {
324   GstRTSPStreamPrivate *priv;
325   guint pt;
326
327   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
328
329   priv = stream->priv;
330
331   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
332
333   return pt;
334 }
335
336 /**
337  * gst_rtsp_stream_get_srcpad:
338  * @stream: a #GstRTSPStream
339  *
340  * Get the srcpad associated with @stream.
341  *
342  * Returns: (transfer full): the srcpad. Unref after usage.
343  */
344 GstPad *
345 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
346 {
347   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
348
349   return gst_object_ref (stream->priv->srcpad);
350 }
351
352 /**
353  * gst_rtsp_stream_get_control:
354  * @stream: a #GstRTSPStream
355  *
356  * Get the control string to identify this stream.
357  *
358  * Returns: (transfer full): the control string. free after usage.
359  */
360 gchar *
361 gst_rtsp_stream_get_control (GstRTSPStream * stream)
362 {
363   GstRTSPStreamPrivate *priv;
364   gchar *result;
365
366   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
367
368   priv = stream->priv;
369
370   g_mutex_lock (&priv->lock);
371   if ((result = g_strdup (priv->control)) == NULL)
372     result = g_strdup_printf ("stream=%u", priv->idx);
373   g_mutex_unlock (&priv->lock);
374
375   return result;
376 }
377
378 /**
379  * gst_rtsp_stream_set_control:
380  * @stream: a #GstRTSPStream
381  * @control: a control string
382  *
383  * Set the control string in @stream.
384  */
385 void
386 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
387 {
388   GstRTSPStreamPrivate *priv;
389
390   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
391
392   priv = stream->priv;
393
394   g_mutex_lock (&priv->lock);
395   g_free (priv->control);
396   priv->control = g_strdup (control);
397   g_mutex_unlock (&priv->lock);
398 }
399
400 /**
401  * gst_rtsp_stream_has_control:
402  * @stream: a #GstRTSPStream
403  * @control: a control string
404  *
405  * Check if @stream has the control string @control.
406  *
407  * Returns: %TRUE is @stream has @control as the control string
408  */
409 gboolean
410 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
411 {
412   GstRTSPStreamPrivate *priv;
413   gboolean res;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
416
417   priv = stream->priv;
418
419   g_mutex_lock (&priv->lock);
420   if (priv->control)
421     res = (g_strcmp0 (priv->control, control) == 0);
422   else {
423     guint streamid;
424
425     if (sscanf (control, "stream=%u", &streamid) > 0)
426       res = (streamid == priv->idx);
427     else
428       res = FALSE;
429   }
430   g_mutex_unlock (&priv->lock);
431
432   return res;
433 }
434
435 /**
436  * gst_rtsp_stream_set_mtu:
437  * @stream: a #GstRTSPStream
438  * @mtu: a new MTU
439  *
440  * Configure the mtu in the payloader of @stream to @mtu.
441  */
442 void
443 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
444 {
445   GstRTSPStreamPrivate *priv;
446
447   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
448
449   priv = stream->priv;
450
451   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
452
453   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
454 }
455
456 /**
457  * gst_rtsp_stream_get_mtu:
458  * @stream: a #GstRTSPStream
459  *
460  * Get the configured MTU in the payloader of @stream.
461  *
462  * Returns: the MTU of the payloader.
463  */
464 guint
465 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
466 {
467   GstRTSPStreamPrivate *priv;
468   guint mtu;
469
470   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
471
472   priv = stream->priv;
473
474   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
475
476   return mtu;
477 }
478
479 /* Update the dscp qos property on the udp sinks */
480 static void
481 update_dscp_qos (GstRTSPStream * stream)
482 {
483   GstRTSPStreamPrivate *priv;
484
485   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
486
487   priv = stream->priv;
488
489   if (priv->udpsink[0]) {
490     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
491         NULL);
492   }
493
494   if (priv->udpsink[1]) {
495     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
496         NULL);
497   }
498 }
499
500 /**
501  * gst_rtsp_stream_set_dscp_qos:
502  * @stream: a #GstRTSPStream
503  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
504  *
505  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
506  */
507 void
508 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
509 {
510   GstRTSPStreamPrivate *priv;
511
512   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
513
514   priv = stream->priv;
515
516   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
517
518   if (dscp_qos < -1 || dscp_qos > 63) {
519     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
520     return;
521   }
522
523   priv->dscp_qos = dscp_qos;
524
525   update_dscp_qos (stream);
526 }
527
528 /**
529  * gst_rtsp_stream_get_dscp_qos:
530  * @stream: a #GstRTSPStream
531  *
532  * Get the configured DSCP QoS in of the outgoing sockets.
533  *
534  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
535  */
536 gint
537 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
538 {
539   GstRTSPStreamPrivate *priv;
540
541   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
542
543   priv = stream->priv;
544
545   return priv->dscp_qos;
546 }
547
548 /**
549  * gst_rtsp_stream_is_transport_supported:
550  * @stream: a #GstRTSPStream
551  * @transport: a #GstRTSPTransport
552  *
553  * Check if @transport can be handled by stream
554  *
555  * Returns: %TRUE if @transport can be handled by @stream.
556  */
557 gboolean
558 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
559     GstRTSPTransport * transport)
560 {
561   GstRTSPStreamPrivate *priv;
562
563   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
564
565   priv = stream->priv;
566
567   g_mutex_lock (&priv->lock);
568   if (transport->trans != GST_RTSP_TRANS_RTP)
569     goto unsupported_transmode;
570
571   if (transport->profile != GST_RTSP_PROFILE_AVP)
572     goto unsupported_profile;
573
574   if (!(transport->lower_transport & priv->protocols))
575     goto unsupported_ltrans;
576
577   g_mutex_unlock (&priv->lock);
578
579   return TRUE;
580
581   /* ERRORS */
582 unsupported_transmode:
583   {
584     GST_DEBUG ("unsupported transport mode %d", transport->trans);
585     return FALSE;
586   }
587 unsupported_profile:
588   {
589     GST_DEBUG ("unsupported profile %d", transport->profile);
590     return FALSE;
591   }
592 unsupported_ltrans:
593   {
594     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
595     return FALSE;
596   }
597 }
598
599 /**
600  * gst_rtsp_stream_set_protocols:
601  * @stream: a #GstRTSPStream
602  * @protocols: the new flags
603  *
604  * Configure the allowed lower transport for @stream.
605  */
606 void
607 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
608     GstRTSPLowerTrans protocols)
609 {
610   GstRTSPStreamPrivate *priv;
611
612   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
613
614   priv = stream->priv;
615
616   g_mutex_lock (&priv->lock);
617   priv->protocols = protocols;
618   g_mutex_unlock (&priv->lock);
619 }
620
621 /**
622  * gst_rtsp_stream_get_protocols:
623  * @stream: a #GstRTSPStream
624  *
625  * Get the allowed protocols of @stream.
626  *
627  * Returns: a #GstRTSPLowerTrans
628  */
629 GstRTSPLowerTrans
630 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
631 {
632   GstRTSPStreamPrivate *priv;
633   GstRTSPLowerTrans res;
634
635   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
636       GST_RTSP_LOWER_TRANS_UNKNOWN);
637
638   priv = stream->priv;
639
640   g_mutex_lock (&priv->lock);
641   res = priv->protocols;
642   g_mutex_unlock (&priv->lock);
643
644   return res;
645 }
646
647 /**
648  * gst_rtsp_stream_set_address_pool:
649  * @stream: a #GstRTSPStream
650  * @pool: a #GstRTSPAddressPool
651  *
652  * configure @pool to be used as the address pool of @stream.
653  */
654 void
655 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
656     GstRTSPAddressPool * pool)
657 {
658   GstRTSPStreamPrivate *priv;
659   GstRTSPAddressPool *old;
660
661   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
662
663   priv = stream->priv;
664
665   GST_LOG_OBJECT (stream, "set address pool %p", pool);
666
667   g_mutex_lock (&priv->lock);
668   if ((old = priv->pool) != pool)
669     priv->pool = pool ? g_object_ref (pool) : NULL;
670   else
671     old = NULL;
672   g_mutex_unlock (&priv->lock);
673
674   if (old)
675     g_object_unref (old);
676 }
677
678 /**
679  * gst_rtsp_stream_get_address_pool:
680  * @stream: a #GstRTSPStream
681  *
682  * Get the #GstRTSPAddressPool used as the address pool of @stream.
683  *
684  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
685  * usage.
686  */
687 GstRTSPAddressPool *
688 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
689 {
690   GstRTSPStreamPrivate *priv;
691   GstRTSPAddressPool *result;
692
693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
694
695   priv = stream->priv;
696
697   g_mutex_lock (&priv->lock);
698   if ((result = priv->pool))
699     g_object_ref (result);
700   g_mutex_unlock (&priv->lock);
701
702   return result;
703 }
704
705 /**
706  * gst_rtsp_stream_get_multicast_address:
707  * @stream: a #GstRTSPStream
708  * @family: the #GSocketFamily
709  *
710  * Get the multicast address of @stream for @family.
711  *
712  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
713  * allocated. gst_rtsp_address_free() after usage.
714  */
715 GstRTSPAddress *
716 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
717     GSocketFamily family)
718 {
719   GstRTSPStreamPrivate *priv;
720   GstRTSPAddress *result;
721   GstRTSPAddress **addrp;
722   GstRTSPAddressFlags flags;
723
724   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
725
726   priv = stream->priv;
727
728   if (family == G_SOCKET_FAMILY_IPV6) {
729     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
730     addrp = &priv->addr_v4;
731   } else {
732     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
733     addrp = &priv->addr_v6;
734   }
735
736   g_mutex_lock (&priv->lock);
737   if (*addrp == NULL) {
738     if (priv->pool == NULL)
739       goto no_pool;
740
741     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
742
743     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
744     if (*addrp == NULL)
745       goto no_address;
746   }
747   result = gst_rtsp_address_copy (*addrp);
748   g_mutex_unlock (&priv->lock);
749
750   return result;
751
752   /* ERRORS */
753 no_pool:
754   {
755     GST_ERROR_OBJECT (stream, "no address pool specified");
756     g_mutex_unlock (&priv->lock);
757     return NULL;
758   }
759 no_address:
760   {
761     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
762     g_mutex_unlock (&priv->lock);
763     return NULL;
764   }
765 }
766
767 /**
768  * gst_rtsp_stream_reserve_address:
769  * @stream: a #GstRTSPStream
770  * @address: an address
771  * @port: a port
772  * @n_ports: n_ports
773  * @ttl: a TTL
774  *
775  * Reserve @address and @port as the address and port of @stream.
776  *
777  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
778  * reserved. gst_rtsp_address_free() after usage.
779  */
780 GstRTSPAddress *
781 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
782     const gchar * address, guint port, guint n_ports, guint ttl)
783 {
784   GstRTSPStreamPrivate *priv;
785   GstRTSPAddress *result;
786   GInetAddress *addr;
787   GSocketFamily family;
788   GstRTSPAddress **addrp;
789
790   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
791   g_return_val_if_fail (address != NULL, NULL);
792   g_return_val_if_fail (port > 0, NULL);
793   g_return_val_if_fail (n_ports > 0, NULL);
794   g_return_val_if_fail (ttl > 0, NULL);
795
796   priv = stream->priv;
797
798   addr = g_inet_address_new_from_string (address);
799   if (!addr) {
800     GST_ERROR ("failed to get inet addr from %s", address);
801     family = G_SOCKET_FAMILY_IPV4;
802   } else {
803     family = g_inet_address_get_family (addr);
804     g_object_unref (addr);
805   }
806
807   if (family == G_SOCKET_FAMILY_IPV6)
808     addrp = &priv->addr_v4;
809   else
810     addrp = &priv->addr_v6;
811
812   g_mutex_lock (&priv->lock);
813   if (*addrp == NULL) {
814     GstRTSPAddressPoolResult res;
815
816     if (priv->pool == NULL)
817       goto no_pool;
818
819     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
820         port, n_ports, ttl, addrp);
821     if (res != GST_RTSP_ADDRESS_POOL_OK)
822       goto no_address;
823   } else {
824     if (strcmp ((*addrp)->address, address) ||
825         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
826         (*addrp)->ttl != ttl)
827       goto different_address;
828   }
829   result = gst_rtsp_address_copy (*addrp);
830   g_mutex_unlock (&priv->lock);
831
832   return result;
833
834   /* ERRORS */
835 no_pool:
836   {
837     GST_ERROR_OBJECT (stream, "no address pool specified");
838     g_mutex_unlock (&priv->lock);
839     return NULL;
840   }
841 no_address:
842   {
843     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
844         address);
845     g_mutex_unlock (&priv->lock);
846     return NULL;
847   }
848 different_address:
849   {
850     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
851         " reserved", address);
852     g_mutex_unlock (&priv->lock);
853     return NULL;
854   }
855 }
856
857 static gboolean
858 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
859     GSocketFamily family, GstElement * udpsrc_out[2],
860     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
861     GstRTSPAddress ** server_addr_out)
862 {
863   GstStateChangeReturn ret;
864   GstElement *udpsrc0, *udpsrc1;
865   GstElement *udpsink0, *udpsink1;
866   GSocket *rtp_socket = NULL;
867   GSocket *rtcp_socket;
868   gint tmp_rtp, tmp_rtcp;
869   guint count;
870   gint rtpport, rtcpport;
871   GList *rejected_addresses = NULL;
872   GstRTSPAddress *addr = NULL;
873   GInetAddress *inetaddr = NULL;
874   GSocketAddress *rtp_sockaddr = NULL;
875   GSocketAddress *rtcp_sockaddr = NULL;
876   const gchar *multisink_socket;
877
878   if (family == G_SOCKET_FAMILY_IPV6)
879     multisink_socket = "socket-v6";
880   else
881     multisink_socket = "socket";
882
883   udpsrc0 = NULL;
884   udpsrc1 = NULL;
885   udpsink0 = NULL;
886   udpsink1 = NULL;
887   count = 0;
888
889   /* Start with random port */
890   tmp_rtp = 0;
891
892   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
893       G_SOCKET_PROTOCOL_UDP, NULL);
894   if (!rtcp_socket)
895     goto no_udp_protocol;
896
897   if (*server_addr_out)
898     gst_rtsp_address_free (*server_addr_out);
899
900   /* try to allocate 2 UDP ports, the RTP port should be an even
901    * number and the RTCP port should be the next (uneven) port */
902 again:
903
904   if (rtp_socket == NULL) {
905     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
906         G_SOCKET_PROTOCOL_UDP, NULL);
907     if (!rtp_socket)
908       goto no_udp_protocol;
909   }
910
911   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
912     GstRTSPAddressFlags flags;
913
914     if (addr)
915       rejected_addresses = g_list_prepend (rejected_addresses, addr);
916
917     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
918     if (family == G_SOCKET_FAMILY_IPV6)
919       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
920     else
921       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
922
923     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
924
925     if (addr == NULL)
926       goto no_ports;
927
928     tmp_rtp = addr->port;
929
930     g_clear_object (&inetaddr);
931     inetaddr = g_inet_address_new_from_string (addr->address);
932   } else {
933     if (tmp_rtp != 0) {
934       tmp_rtp += 2;
935       if (++count > 20)
936         goto no_ports;
937     }
938
939     if (inetaddr == NULL)
940       inetaddr = g_inet_address_new_any (family);
941   }
942
943   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
944   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
945     g_object_unref (rtp_sockaddr);
946     goto again;
947   }
948   g_object_unref (rtp_sockaddr);
949
950   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
951   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
952     g_clear_object (&rtp_sockaddr);
953     goto socket_error;
954   }
955
956   tmp_rtp =
957       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
958   g_object_unref (rtp_sockaddr);
959
960   /* check if port is even */
961   if ((tmp_rtp & 1) != 0) {
962     /* port not even, close and allocate another */
963     tmp_rtp++;
964     g_clear_object (&rtp_socket);
965     goto again;
966   }
967
968   /* set port */
969   tmp_rtcp = tmp_rtp + 1;
970
971   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
972   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
973     g_object_unref (rtcp_sockaddr);
974     g_clear_object (&rtp_socket);
975     goto again;
976   }
977   g_object_unref (rtcp_sockaddr);
978
979   g_clear_object (&inetaddr);
980
981   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
982   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
983
984   if (udpsrc0 == NULL || udpsrc1 == NULL)
985     goto no_udp_protocol;
986
987   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
988   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
989
990   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
991   if (ret == GST_STATE_CHANGE_FAILURE)
992     goto element_error;
993   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
994   if (ret == GST_STATE_CHANGE_FAILURE)
995     goto element_error;
996
997   /* all fine, do port check */
998   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
999   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1000
1001   /* this should not happen... */
1002   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1003     goto port_error;
1004
1005   if (udpsink_out[0])
1006     udpsink0 = udpsink_out[0];
1007   else
1008     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1009
1010   if (!udpsink0)
1011     goto no_udp_protocol;
1012
1013   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1014   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1015
1016   if (udpsink_out[1])
1017     udpsink1 = udpsink_out[1];
1018   else
1019     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1020
1021   if (!udpsink1)
1022     goto no_udp_protocol;
1023
1024   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1025   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1026   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1027
1028   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1029   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1030   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1031   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1032   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1033   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1034   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1035   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1036
1037   /* we keep these elements, we will further configure them when the
1038    * client told us to really use the UDP ports. */
1039   udpsrc_out[0] = udpsrc0;
1040   udpsrc_out[1] = udpsrc1;
1041   udpsink_out[0] = udpsink0;
1042   udpsink_out[1] = udpsink1;
1043   server_port_out->min = rtpport;
1044   server_port_out->max = rtcpport;
1045
1046   *server_addr_out = addr;
1047   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1048
1049   g_object_unref (rtp_socket);
1050   g_object_unref (rtcp_socket);
1051
1052   return TRUE;
1053
1054   /* ERRORS */
1055 no_udp_protocol:
1056   {
1057     goto cleanup;
1058   }
1059 no_ports:
1060   {
1061     goto cleanup;
1062   }
1063 port_error:
1064   {
1065     goto cleanup;
1066   }
1067 socket_error:
1068   {
1069     goto cleanup;
1070   }
1071 element_error:
1072   {
1073     goto cleanup;
1074   }
1075 cleanup:
1076   {
1077     if (udpsrc0) {
1078       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1079       gst_object_unref (udpsrc0);
1080     }
1081     if (udpsrc1) {
1082       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1083       gst_object_unref (udpsrc1);
1084     }
1085     if (udpsink0) {
1086       gst_element_set_state (udpsink0, GST_STATE_NULL);
1087       gst_object_unref (udpsink0);
1088     }
1089     if (inetaddr)
1090       g_object_unref (inetaddr);
1091     g_list_free_full (rejected_addresses,
1092         (GDestroyNotify) gst_rtsp_address_free);
1093     if (addr)
1094       gst_rtsp_address_free (addr);
1095     if (rtp_socket)
1096       g_object_unref (rtp_socket);
1097     if (rtcp_socket)
1098       g_object_unref (rtcp_socket);
1099     return FALSE;
1100   }
1101 }
1102
1103 /* must be called with lock */
1104 static gboolean
1105 alloc_ports (GstRTSPStream * stream)
1106 {
1107   GstRTSPStreamPrivate *priv = stream->priv;
1108
1109   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1110       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1111       &priv->server_port_v4, &priv->server_addr_v4);
1112
1113   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1114       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1115       &priv->server_port_v6, &priv->server_addr_v6);
1116
1117   return priv->have_ipv4 || priv->have_ipv6;
1118 }
1119
1120 /**
1121  * gst_rtsp_stream_get_server_port:
1122  * @stream: a #GstRTSPStream
1123  * @server_port: (out): result server port
1124  * @family: the port family to get
1125  *
1126  * Fill @server_port with the port pair used by the server. This function can
1127  * only be called when @stream has been joined.
1128  */
1129 void
1130 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1131     GstRTSPRange * server_port, GSocketFamily family)
1132 {
1133   GstRTSPStreamPrivate *priv;
1134
1135   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1136   priv = stream->priv;
1137   g_return_if_fail (priv->is_joined);
1138
1139   g_mutex_lock (&priv->lock);
1140   if (family == G_SOCKET_FAMILY_IPV4) {
1141     if (server_port)
1142       *server_port = priv->server_port_v4;
1143   } else {
1144     if (server_port)
1145       *server_port = priv->server_port_v6;
1146   }
1147   g_mutex_unlock (&priv->lock);
1148 }
1149
1150 /**
1151  * gst_rtsp_stream_get_rtpsession:
1152  * @stream: a #GstRTSPStream
1153  *
1154  * Get the RTP session of this stream.
1155  *
1156  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1157  */
1158 GObject *
1159 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1160 {
1161   GstRTSPStreamPrivate *priv;
1162   GObject *session;
1163
1164   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1165
1166   priv = stream->priv;
1167
1168   g_mutex_lock (&priv->lock);
1169   if ((session = priv->session))
1170     g_object_ref (session);
1171   g_mutex_unlock (&priv->lock);
1172
1173   return session;
1174 }
1175
1176 /**
1177  * gst_rtsp_stream_get_ssrc:
1178  * @stream: a #GstRTSPStream
1179  * @ssrc: (out): result ssrc
1180  *
1181  * Get the SSRC used by the RTP session of this stream. This function can only
1182  * be called when @stream has been joined.
1183  */
1184 void
1185 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1186 {
1187   GstRTSPStreamPrivate *priv;
1188
1189   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1190   priv = stream->priv;
1191   g_return_if_fail (priv->is_joined);
1192
1193   g_mutex_lock (&priv->lock);
1194   if (ssrc && priv->session)
1195     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1196   g_mutex_unlock (&priv->lock);
1197 }
1198
1199 /* executed from streaming thread */
1200 static void
1201 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1202 {
1203   GstRTSPStreamPrivate *priv = stream->priv;
1204   GstCaps *newcaps, *oldcaps;
1205
1206   newcaps = gst_pad_get_current_caps (pad);
1207
1208   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1209       newcaps);
1210
1211   g_mutex_lock (&priv->lock);
1212   oldcaps = priv->caps;
1213   priv->caps = newcaps;
1214   g_mutex_unlock (&priv->lock);
1215
1216   if (oldcaps)
1217     gst_caps_unref (oldcaps);
1218 }
1219
1220 static void
1221 dump_structure (const GstStructure * s)
1222 {
1223   gchar *sstr;
1224
1225   sstr = gst_structure_to_string (s);
1226   GST_INFO ("structure: %s", sstr);
1227   g_free (sstr);
1228 }
1229
1230 static GstRTSPStreamTransport *
1231 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1232 {
1233   GstRTSPStreamPrivate *priv = stream->priv;
1234   GList *walk;
1235   GstRTSPStreamTransport *result = NULL;
1236   const gchar *tmp;
1237   gchar *dest;
1238   guint port;
1239
1240   if (rtcp_from == NULL)
1241     return NULL;
1242
1243   tmp = g_strrstr (rtcp_from, ":");
1244   if (tmp == NULL)
1245     return NULL;
1246
1247   port = atoi (tmp + 1);
1248   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1249
1250   g_mutex_lock (&priv->lock);
1251   GST_INFO ("finding %s:%d in %d transports", dest, port,
1252       g_list_length (priv->transports));
1253
1254   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1255     GstRTSPStreamTransport *trans = walk->data;
1256     const GstRTSPTransport *tr;
1257     gint min, max;
1258
1259     tr = gst_rtsp_stream_transport_get_transport (trans);
1260
1261     min = tr->client_port.min;
1262     max = tr->client_port.max;
1263
1264     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1265       result = trans;
1266       break;
1267     }
1268   }
1269   if (result)
1270     g_object_ref (result);
1271   g_mutex_unlock (&priv->lock);
1272
1273   g_free (dest);
1274
1275   return result;
1276 }
1277
1278 static GstRTSPStreamTransport *
1279 check_transport (GObject * source, GstRTSPStream * stream)
1280 {
1281   GstStructure *stats;
1282   GstRTSPStreamTransport *trans;
1283
1284   /* see if we have a stream to match with the origin of the RTCP packet */
1285   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1286   if (trans == NULL) {
1287     g_object_get (source, "stats", &stats, NULL);
1288     if (stats) {
1289       const gchar *rtcp_from;
1290
1291       dump_structure (stats);
1292
1293       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1294       if ((trans = find_transport (stream, rtcp_from))) {
1295         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1296             source);
1297         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1298             g_object_unref);
1299       }
1300       gst_structure_free (stats);
1301     }
1302   }
1303   return trans;
1304 }
1305
1306
1307 static void
1308 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1309 {
1310   GstRTSPStreamTransport *trans;
1311
1312   GST_INFO ("%p: new source %p", stream, source);
1313
1314   trans = check_transport (source, stream);
1315
1316   if (trans)
1317     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1318 }
1319
1320 static void
1321 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1322 {
1323   GST_INFO ("%p: new SDES %p", stream, source);
1324 }
1325
1326 static void
1327 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1328 {
1329   GstRTSPStreamTransport *trans;
1330
1331   trans = check_transport (source, stream);
1332
1333   if (trans) {
1334     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1335     gst_rtsp_stream_transport_keep_alive (trans);
1336   }
1337 #ifdef DUMP_STATS
1338   {
1339     GstStructure *stats;
1340     g_object_get (source, "stats", &stats, NULL);
1341     if (stats) {
1342       dump_structure (stats);
1343       gst_structure_free (stats);
1344     }
1345   }
1346 #endif
1347 }
1348
1349 static void
1350 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1351 {
1352   GST_INFO ("%p: source %p bye", stream, source);
1353 }
1354
1355 static void
1356 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1357 {
1358   GstRTSPStreamTransport *trans;
1359
1360   GST_INFO ("%p: source %p bye timeout", stream, source);
1361
1362   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1363     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1364     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1365   }
1366 }
1367
1368 static void
1369 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1370 {
1371   GstRTSPStreamTransport *trans;
1372
1373   GST_INFO ("%p: source %p timeout", stream, source);
1374
1375   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1376     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1377     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1378   }
1379 }
1380
1381 static GstFlowReturn
1382 handle_new_sample (GstAppSink * sink, gpointer user_data)
1383 {
1384   GstRTSPStreamPrivate *priv;
1385   GList *walk;
1386   GstSample *sample;
1387   GstBuffer *buffer;
1388   GstRTSPStream *stream;
1389
1390   sample = gst_app_sink_pull_sample (sink);
1391   if (!sample)
1392     return GST_FLOW_OK;
1393
1394   stream = (GstRTSPStream *) user_data;
1395   priv = stream->priv;
1396   buffer = gst_sample_get_buffer (sample);
1397
1398   g_mutex_lock (&priv->lock);
1399   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1400     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1401
1402     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1403       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1404     } else {
1405       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1406     }
1407   }
1408   g_mutex_unlock (&priv->lock);
1409
1410   gst_sample_unref (sample);
1411
1412   return GST_FLOW_OK;
1413 }
1414
1415 static GstAppSinkCallbacks sink_cb = {
1416   NULL,                         /* not interested in EOS */
1417   NULL,                         /* not interested in preroll samples */
1418   handle_new_sample,
1419 };
1420
1421 /**
1422  * gst_rtsp_stream_join_bin:
1423  * @stream: a #GstRTSPStream
1424  * @bin: a #GstBin to join
1425  * @rtpbin: a rtpbin element in @bin
1426  * @state: the target state of the new elements
1427  *
1428  * Join the #GstBin @bin that contains the element @rtpbin.
1429  *
1430  * @stream will link to @rtpbin, which must be inside @bin. The elements
1431  * added to @bin will be set to the state given in @state.
1432  *
1433  * Returns: %TRUE on success.
1434  */
1435 gboolean
1436 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1437     GstElement * rtpbin, GstState state)
1438 {
1439   GstRTSPStreamPrivate *priv;
1440   gint i;
1441   guint idx;
1442   gchar *name;
1443   GstPad *pad, *sinkpad, *selpad;
1444   GstPadLinkReturn ret;
1445
1446   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1447   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1448   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1449
1450   priv = stream->priv;
1451
1452   g_mutex_lock (&priv->lock);
1453   if (priv->is_joined)
1454     goto was_joined;
1455
1456   /* create a session with the same index as the stream */
1457   idx = priv->idx;
1458
1459   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1460
1461   if (!alloc_ports (stream))
1462     goto no_ports;
1463
1464   /* update the dscp qos field in the sinks */
1465   update_dscp_qos (stream);
1466
1467   /* get a pad for sending RTP */
1468   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1469   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1470   g_free (name);
1471   /* link the RTP pad to the session manager, it should not really fail unless
1472    * this is not really an RTP pad */
1473   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1474   if (ret != GST_PAD_LINK_OK)
1475     goto link_failed;
1476
1477   /* get pads from the RTP session element for sending and receiving
1478    * RTP/RTCP*/
1479   name = g_strdup_printf ("send_rtp_src_%u", idx);
1480   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1481   g_free (name);
1482   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1483   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1484   g_free (name);
1485   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1486   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1487   g_free (name);
1488   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1489   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1490   g_free (name);
1491
1492   /* get the session */
1493   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1494
1495   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1496       stream);
1497   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1498       stream);
1499   g_signal_connect (priv->session, "on-ssrc-active",
1500       (GCallback) on_ssrc_active, stream);
1501   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1502       stream);
1503   g_signal_connect (priv->session, "on-bye-timeout",
1504       (GCallback) on_bye_timeout, stream);
1505   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1506       stream);
1507
1508   for (i = 0; i < 2; i++) {
1509     GstPad *teepad, *queuepad;
1510     /* For the sender we create this bit of pipeline for both
1511      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1512      * we need to add a queue before appsink to make the pipeline
1513      * not block. For the TCP case, we want to pump data to the
1514      * client as fast as possible anyway.
1515      *
1516      * .--------.      .-----.    .---------.
1517      * | rtpbin |      | tee |    | udpsink |
1518      * |       send->sink   src->sink       |
1519      * '--------'      |     |    '---------'
1520      *                 |     |    .---------.    .---------.
1521      *                 |     |    |  queue  |    | appsink |
1522      *                 |    src->sink      src->sink       |
1523      *                 '-----'    '---------'    '---------'
1524      *
1525      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1526      * udpsink directly to the session.
1527      */
1528     /* add udpsink */
1529     gst_bin_add (bin, priv->udpsink[i]);
1530     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1531
1532     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1533       /* make tee for RTP/RTCP */
1534       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1535       gst_bin_add (bin, priv->tee[i]);
1536
1537       /* and link to rtpbin send pad */
1538       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1539       gst_pad_link (priv->send_src[i], pad);
1540       gst_object_unref (pad);
1541
1542       /* link tee to udpsink */
1543       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1544       gst_pad_link (teepad, sinkpad);
1545       gst_object_unref (teepad);
1546
1547       /* make queue */
1548       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1549       gst_bin_add (bin, priv->appqueue[i]);
1550       /* and link to tee */
1551       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1552       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1553       gst_pad_link (teepad, pad);
1554       gst_object_unref (pad);
1555       gst_object_unref (teepad);
1556
1557       /* make appsink */
1558       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1559       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1560       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1561       gst_bin_add (bin, priv->appsink[i]);
1562       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1563           &sink_cb, stream, NULL);
1564       /* and link to queue */
1565       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1566       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1567       gst_pad_link (queuepad, pad);
1568       gst_object_unref (pad);
1569       gst_object_unref (queuepad);
1570     } else {
1571       /* else only udpsink needed, link it to the session */
1572       gst_pad_link (priv->send_src[i], sinkpad);
1573     }
1574     gst_object_unref (sinkpad);
1575
1576     /* For the receiver we create this bit of pipeline for both
1577      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1578      * and it is all funneled into the rtpbin receive pad.
1579      *
1580      * .--------.     .--------.    .--------.
1581      * | udpsrc |     | funnel |    | rtpbin |
1582      * |       src->sink      src->sink      |
1583      * '--------'     |        |    '--------'
1584      * .--------.     |        |
1585      * | appsrc |     |        |
1586      * |       src->sink       |
1587      * '--------'     '--------'
1588      */
1589     /* make funnel for the RTP/RTCP receivers */
1590     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1591     gst_bin_add (bin, priv->funnel[i]);
1592
1593     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1594     gst_pad_link (pad, priv->recv_sink[i]);
1595     gst_object_unref (pad);
1596
1597     if (priv->udpsrc_v4[i]) {
1598       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1599        * values */
1600       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1601       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1602       /* add udpsrc */
1603       gst_bin_add (bin, priv->udpsrc_v4[i]);
1604
1605       /* and link to the funnel v4 */
1606       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1607       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1608       gst_pad_link (pad, selpad);
1609       gst_object_unref (pad);
1610       gst_object_unref (selpad);
1611     }
1612
1613     if (priv->udpsrc_v6[i]) {
1614       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1615       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1616       gst_bin_add (bin, priv->udpsrc_v6[i]);
1617
1618       /* and link to the funnel v6 */
1619       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1620       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1621       gst_pad_link (pad, selpad);
1622       gst_object_unref (pad);
1623       gst_object_unref (selpad);
1624     }
1625
1626     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1627       /* make and add appsrc */
1628       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1629       gst_bin_add (bin, priv->appsrc[i]);
1630       /* and link to the funnel */
1631       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1632       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1633       gst_pad_link (pad, selpad);
1634       gst_object_unref (pad);
1635       gst_object_unref (selpad);
1636     }
1637
1638     /* check if we need to set to a special state */
1639     if (state != GST_STATE_NULL) {
1640       if (priv->udpsink[i])
1641         gst_element_set_state (priv->udpsink[i], state);
1642       if (priv->appsink[i])
1643         gst_element_set_state (priv->appsink[i], state);
1644       if (priv->appqueue[i])
1645         gst_element_set_state (priv->appqueue[i], state);
1646       if (priv->tee[i])
1647         gst_element_set_state (priv->tee[i], state);
1648       if (priv->funnel[i])
1649         gst_element_set_state (priv->funnel[i], state);
1650       if (priv->appsrc[i])
1651         gst_element_set_state (priv->appsrc[i], state);
1652     }
1653   }
1654
1655   /* be notified of caps changes */
1656   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1657       (GCallback) caps_notify, stream);
1658
1659   priv->is_joined = TRUE;
1660   g_mutex_unlock (&priv->lock);
1661
1662   return TRUE;
1663
1664   /* ERRORS */
1665 was_joined:
1666   {
1667     g_mutex_unlock (&priv->lock);
1668     return TRUE;
1669   }
1670 no_ports:
1671   {
1672     g_mutex_unlock (&priv->lock);
1673     GST_WARNING ("failed to allocate ports %u", idx);
1674     return FALSE;
1675   }
1676 link_failed:
1677   {
1678     GST_WARNING ("failed to link stream %u", idx);
1679     gst_object_unref (priv->send_rtp_sink);
1680     priv->send_rtp_sink = NULL;
1681     g_mutex_unlock (&priv->lock);
1682     return FALSE;
1683   }
1684 }
1685
1686 /**
1687  * gst_rtsp_stream_leave_bin:
1688  * @stream: a #GstRTSPStream
1689  * @bin: a #GstBin
1690  * @rtpbin: a rtpbin #GstElement
1691  *
1692  * Remove the elements of @stream from @bin.
1693  *
1694  * Return: %TRUE on success.
1695  */
1696 gboolean
1697 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1698     GstElement * rtpbin)
1699 {
1700   GstRTSPStreamPrivate *priv;
1701   gint i;
1702
1703   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1704   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1705   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1706
1707   priv = stream->priv;
1708
1709   g_mutex_lock (&priv->lock);
1710   if (!priv->is_joined)
1711     goto was_not_joined;
1712
1713   /* all transports must be removed by now */
1714   g_return_val_if_fail (priv->transports == NULL, FALSE);
1715
1716   GST_INFO ("stream %p leaving bin", stream);
1717
1718   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1719   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1720   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1721   gst_object_unref (priv->send_rtp_sink);
1722   priv->send_rtp_sink = NULL;
1723
1724   for (i = 0; i < 2; i++) {
1725     if (priv->udpsink[i])
1726       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1727     if (priv->appsink[i])
1728       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1729     if (priv->appqueue[i])
1730       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1731     if (priv->tee[i])
1732       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1733     if (priv->funnel[i])
1734       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1735     if (priv->appsrc[i])
1736       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1737     if (priv->udpsrc_v4[i]) {
1738       /* and set udpsrc to NULL now before removing */
1739       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1740       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1741       /* removing them should also nicely release the request
1742        * pads when they finalize */
1743       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1744     }
1745     if (priv->udpsrc_v6[i]) {
1746       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1747       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1748       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1749     }
1750     if (priv->udpsink[i])
1751       gst_bin_remove (bin, priv->udpsink[i]);
1752     if (priv->appsrc[i])
1753       gst_bin_remove (bin, priv->appsrc[i]);
1754     if (priv->appsink[i])
1755       gst_bin_remove (bin, priv->appsink[i]);
1756     if (priv->appqueue[i])
1757       gst_bin_remove (bin, priv->appqueue[i]);
1758     if (priv->tee[i])
1759       gst_bin_remove (bin, priv->tee[i]);
1760     if (priv->funnel[i])
1761       gst_bin_remove (bin, priv->funnel[i]);
1762
1763     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1764     gst_object_unref (priv->recv_sink[i]);
1765     priv->recv_sink[i] = NULL;
1766
1767     priv->udpsrc_v4[i] = NULL;
1768     priv->udpsrc_v6[i] = NULL;
1769     priv->udpsink[i] = NULL;
1770     priv->appsrc[i] = NULL;
1771     priv->appsink[i] = NULL;
1772     priv->appqueue[i] = NULL;
1773     priv->tee[i] = NULL;
1774     priv->funnel[i] = NULL;
1775   }
1776   gst_object_unref (priv->send_src[0]);
1777   priv->send_src[0] = NULL;
1778
1779   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1780   gst_object_unref (priv->send_src[1]);
1781   priv->send_src[1] = NULL;
1782
1783   g_object_unref (priv->session);
1784   priv->session = NULL;
1785   if (priv->caps)
1786     gst_caps_unref (priv->caps);
1787   priv->caps = NULL;
1788
1789   priv->is_joined = FALSE;
1790   g_mutex_unlock (&priv->lock);
1791
1792   return TRUE;
1793
1794 was_not_joined:
1795   {
1796     return TRUE;
1797   }
1798 }
1799
1800 /**
1801  * gst_rtsp_stream_get_rtpinfo:
1802  * @stream: a #GstRTSPStream
1803  * @rtptime: (allow-none): result RTP timestamp
1804  * @seq: (allow-none): result RTP seqnum
1805  * @clock_rate: the clock rate
1806  * @running_time: (allow-none): result running-time
1807  *
1808  * Retrieve the current rtptime, seq and running-time. This is used to
1809  * construct a RTPInfo reply header.
1810  *
1811  * Returns: %TRUE when rtptime, seq and running-time could be determined.
1812  */
1813 gboolean
1814 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1815     guint * rtptime, guint * seq, guint * clock_rate,
1816     GstClockTime * running_time)
1817 {
1818   GstRTSPStreamPrivate *priv;
1819   GObjectClass *payobjclass;
1820
1821   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1822
1823   priv = stream->priv;
1824
1825   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1826
1827   g_mutex_lock (&priv->lock);
1828   if (seq && g_object_class_find_property (payobjclass, "seqnum"))
1829     g_object_get (priv->payloader, "seqnum", seq, NULL);
1830
1831   if (rtptime && g_object_class_find_property (payobjclass, "timestamp"))
1832     g_object_get (priv->payloader, "timestamp", rtptime, NULL);
1833
1834   if (running_time
1835       && g_object_class_find_property (payobjclass, "running-time"))
1836     g_object_get (priv->payloader, "running-time", running_time, NULL);
1837
1838   if (clock_rate && priv->caps) {
1839     GstStructure *s;
1840
1841     s = gst_caps_get_structure (priv->caps, 0);
1842     if (!gst_structure_get_int (s, "clock-rate", (gint *) clock_rate))
1843       if (running_time)
1844         *running_time = GST_CLOCK_TIME_NONE;
1845   }
1846   g_mutex_unlock (&priv->lock);
1847
1848   return TRUE;
1849 }
1850
1851 /**
1852  * gst_rtsp_stream_get_caps:
1853  * @stream: a #GstRTSPStream
1854  *
1855  * Retrieve the current caps of @stream.
1856  *
1857  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1858  *    after usage.
1859  */
1860 GstCaps *
1861 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1862 {
1863   GstRTSPStreamPrivate *priv;
1864   GstCaps *result;
1865
1866   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1867
1868   priv = stream->priv;
1869
1870   g_mutex_lock (&priv->lock);
1871   if ((result = priv->caps))
1872     gst_caps_ref (result);
1873   g_mutex_unlock (&priv->lock);
1874
1875   return result;
1876 }
1877
1878 /**
1879  * gst_rtsp_stream_recv_rtp:
1880  * @stream: a #GstRTSPStream
1881  * @buffer: (transfer full): a #GstBuffer
1882  *
1883  * Handle an RTP buffer for the stream. This method is usually called when a
1884  * message has been received from a client using the TCP transport.
1885  *
1886  * This function takes ownership of @buffer.
1887  *
1888  * Returns: a GstFlowReturn.
1889  */
1890 GstFlowReturn
1891 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1892 {
1893   GstRTSPStreamPrivate *priv;
1894   GstFlowReturn ret;
1895   GstElement *element;
1896
1897   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1898   priv = stream->priv;
1899   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1900   g_return_val_if_fail (priv->is_joined, FALSE);
1901
1902   g_mutex_lock (&priv->lock);
1903   if (priv->appsrc[0])
1904     element = gst_object_ref (priv->appsrc[0]);
1905   else
1906     element = NULL;
1907   g_mutex_unlock (&priv->lock);
1908
1909   if (element) {
1910     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1911     gst_object_unref (element);
1912   } else {
1913     ret = GST_FLOW_OK;
1914   }
1915   return ret;
1916 }
1917
1918 /**
1919  * gst_rtsp_stream_recv_rtcp:
1920  * @stream: a #GstRTSPStream
1921  * @buffer: (transfer full): a #GstBuffer
1922  *
1923  * Handle an RTCP buffer for the stream. This method is usually called when a
1924  * message has been received from a client using the TCP transport.
1925  *
1926  * This function takes ownership of @buffer.
1927  *
1928  * Returns: a GstFlowReturn.
1929  */
1930 GstFlowReturn
1931 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1932 {
1933   GstRTSPStreamPrivate *priv;
1934   GstFlowReturn ret;
1935   GstElement *element;
1936
1937   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1938   priv = stream->priv;
1939   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1940   g_return_val_if_fail (priv->is_joined, FALSE);
1941
1942   g_mutex_lock (&priv->lock);
1943   if (priv->appsrc[1])
1944     element = gst_object_ref (priv->appsrc[1]);
1945   else
1946     element = NULL;
1947   g_mutex_unlock (&priv->lock);
1948
1949   if (element) {
1950     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1951     gst_object_unref (element);
1952   } else {
1953     ret = GST_FLOW_OK;
1954   }
1955   return ret;
1956 }
1957
1958 /* must be called with lock */
1959 static gboolean
1960 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1961     gboolean add)
1962 {
1963   GstRTSPStreamPrivate *priv = stream->priv;
1964   const GstRTSPTransport *tr;
1965
1966   tr = gst_rtsp_stream_transport_get_transport (trans);
1967
1968   switch (tr->lower_transport) {
1969     case GST_RTSP_LOWER_TRANS_UDP:
1970     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1971     {
1972       gchar *dest;
1973       gint min, max;
1974       guint ttl = 0;
1975
1976       dest = tr->destination;
1977       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1978         min = tr->port.min;
1979         max = tr->port.max;
1980         ttl = tr->ttl;
1981       } else {
1982         min = tr->client_port.min;
1983         max = tr->client_port.max;
1984       }
1985
1986       if (add) {
1987         GST_INFO ("adding %s:%d-%d", dest, min, max);
1988         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1989         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1990         if (ttl > 0) {
1991           GST_INFO ("setting ttl-mc %d", ttl);
1992           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1993           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1994         }
1995         priv->transports = g_list_prepend (priv->transports, trans);
1996       } else {
1997         GST_INFO ("removing %s:%d-%d", dest, min, max);
1998         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1999         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2000         priv->transports = g_list_remove (priv->transports, trans);
2001       }
2002       break;
2003     }
2004     case GST_RTSP_LOWER_TRANS_TCP:
2005       if (add) {
2006         GST_INFO ("adding TCP %s", tr->destination);
2007         priv->transports = g_list_prepend (priv->transports, trans);
2008       } else {
2009         GST_INFO ("removing TCP %s", tr->destination);
2010         priv->transports = g_list_remove (priv->transports, trans);
2011       }
2012       break;
2013     default:
2014       goto unknown_transport;
2015   }
2016   return TRUE;
2017
2018   /* ERRORS */
2019 unknown_transport:
2020   {
2021     GST_INFO ("Unknown transport %d", tr->lower_transport);
2022     return FALSE;
2023   }
2024 }
2025
2026
2027 /**
2028  * gst_rtsp_stream_add_transport:
2029  * @stream: a #GstRTSPStream
2030  * @trans: a #GstRTSPStreamTransport
2031  *
2032  * Add the transport in @trans to @stream. The media of @stream will
2033  * then also be send to the values configured in @trans.
2034  *
2035  * @stream must be joined to a bin.
2036  *
2037  * @trans must contain a valid #GstRTSPTransport.
2038  *
2039  * Returns: %TRUE if @trans was added
2040  */
2041 gboolean
2042 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2043     GstRTSPStreamTransport * trans)
2044 {
2045   GstRTSPStreamPrivate *priv;
2046   gboolean res;
2047
2048   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2049   priv = stream->priv;
2050   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2051   g_return_val_if_fail (priv->is_joined, FALSE);
2052
2053   g_mutex_lock (&priv->lock);
2054   res = update_transport (stream, trans, TRUE);
2055   g_mutex_unlock (&priv->lock);
2056
2057   return res;
2058 }
2059
2060 /**
2061  * gst_rtsp_stream_remove_transport:
2062  * @stream: a #GstRTSPStream
2063  * @trans: a #GstRTSPStreamTransport
2064  *
2065  * Remove the transport in @trans from @stream. The media of @stream will
2066  * not be sent to the values configured in @trans.
2067  *
2068  * @stream must be joined to a bin.
2069  *
2070  * @trans must contain a valid #GstRTSPTransport.
2071  *
2072  * Returns: %TRUE if @trans was removed
2073  */
2074 gboolean
2075 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2076     GstRTSPStreamTransport * trans)
2077 {
2078   GstRTSPStreamPrivate *priv;
2079   gboolean res;
2080
2081   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2082   priv = stream->priv;
2083   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2084   g_return_val_if_fail (priv->is_joined, FALSE);
2085
2086   g_mutex_lock (&priv->lock);
2087   res = update_transport (stream, trans, FALSE);
2088   g_mutex_unlock (&priv->lock);
2089
2090   return res;
2091 }
2092
2093 /**
2094  * gst_rtsp_stream_get_rtp_socket:
2095  * @stream: a #GstRTSPStream
2096  * @family: the socket family
2097  *
2098  * Get the RTP socket from @stream for a @family.
2099  *
2100  * @stream must be joined to a bin.
2101  *
2102  * Returns: the RTP socket or %NULL if no socket could be allocated for @family.
2103  *     Unref after usage
2104  */
2105 GSocket *
2106 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2107 {
2108   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2109   GSocket *socket;
2110   gchar *name;
2111
2112   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2113   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2114       family == G_SOCKET_FAMILY_IPV6, NULL);
2115   g_return_val_if_fail (priv->udpsink[0], NULL);
2116
2117   if (family == G_SOCKET_FAMILY_IPV6)
2118     name = "socket-v6";
2119   else
2120     name = "socket";
2121
2122   g_object_get (priv->udpsink[0], name, &socket, NULL);
2123
2124   return socket;
2125 }
2126
2127 /**
2128  * gst_rtsp_stream_get_rtcp_socket:
2129  * @stream: a #GstRTSPStream
2130  * @family: the socket family
2131  *
2132  * Get the RTCP socket from @stream for a @family.
2133  *
2134  * @stream must be joined to a bin.
2135  *
2136  * Returns: the RTCP socket or %NULL if no socket could be allocated for
2137  *     @family. Unref after usage
2138  */
2139 GSocket *
2140 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2141 {
2142   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2143   GSocket *socket;
2144   gchar *name;
2145
2146   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2147   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2148       family == G_SOCKET_FAMILY_IPV6, NULL);
2149   g_return_val_if_fail (priv->udpsink[1], NULL);
2150
2151   if (family == G_SOCKET_FAMILY_IPV6)
2152     name = "socket-v6";
2153   else
2154     name = "socket";
2155
2156   g_object_get (priv->udpsink[1], name, &socket, NULL);
2157
2158   return socket;
2159 }
2160
2161 /**
2162  * gst_rtsp_stream_transport_filter:
2163  * @stream: a #GstRTSPStream
2164  * @func: (scope call) (allow-none): a callback
2165  * @user_data: user data passed to @func
2166  *
2167  * Call @func for each transport managed by @stream. The result value of @func
2168  * determines what happens to the transport. @func will be called with @stream
2169  * locked so no further actions on @stream can be performed from @func.
2170  *
2171  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2172  * @stream.
2173  *
2174  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2175  *
2176  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2177  * will also be added with an additional ref to the result #GList of this
2178  * function..
2179  *
2180  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2181  *
2182  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2183  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2184  * element in the #GList should be unreffed before the list is freed.
2185  */
2186 GList *
2187 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2188     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2189 {
2190   GstRTSPStreamPrivate *priv;
2191   GList *result, *walk, *next;
2192
2193   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2194
2195   priv = stream->priv;
2196
2197   result = NULL;
2198
2199   g_mutex_lock (&priv->lock);
2200   for (walk = priv->transports; walk; walk = next) {
2201     GstRTSPStreamTransport *trans = walk->data;
2202     GstRTSPFilterResult res;
2203
2204     next = g_list_next (walk);
2205
2206     if (func)
2207       res = func (stream, trans, user_data);
2208     else
2209       res = GST_RTSP_FILTER_REF;
2210
2211     switch (res) {
2212       case GST_RTSP_FILTER_REMOVE:
2213         update_transport (stream, trans, FALSE);
2214         break;
2215       case GST_RTSP_FILTER_REF:
2216         result = g_list_prepend (result, g_object_ref (trans));
2217         break;
2218       case GST_RTSP_FILTER_KEEP:
2219       default:
2220         break;
2221     }
2222   }
2223   g_mutex_unlock (&priv->lock);
2224
2225   return result;
2226 }
2227
2228 static GstPadProbeReturn
2229 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2230 {
2231   GstRTSPStreamPrivate *priv;
2232   GstRTSPStream *stream;
2233
2234   stream = user_data;
2235   priv = stream->priv;
2236
2237   GST_DEBUG_OBJECT (pad, "now blocking");
2238
2239   g_mutex_lock (&priv->lock);
2240   priv->blocking = TRUE;
2241   g_mutex_unlock (&priv->lock);
2242
2243   gst_element_post_message (priv->payloader,
2244       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2245           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2246
2247   return GST_PAD_PROBE_OK;
2248 }
2249
2250 /**
2251  * gst_rtsp_stream_set_blocked:
2252  * @stream: a #GstRTSPStream
2253  * @blocked: boolean indicating we should block or unblock
2254  *
2255  * Blocks or unblocks the dataflow on @stream.
2256  *
2257  * Returns: %TRUE on success
2258  */
2259 gboolean
2260 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2261 {
2262   GstRTSPStreamPrivate *priv;
2263
2264   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2265
2266   priv = stream->priv;
2267
2268   g_mutex_lock (&priv->lock);
2269   if (blocked) {
2270     priv->blocking = FALSE;
2271     if (priv->blocked_id == 0) {
2272       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2273           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2274           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2275           g_object_ref (stream), g_object_unref);
2276     }
2277   } else {
2278     if (priv->blocked_id != 0) {
2279       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2280       priv->blocked_id = 0;
2281       priv->blocking = FALSE;
2282     }
2283   }
2284   g_mutex_unlock (&priv->lock);
2285
2286   return TRUE;
2287 }
2288
2289 /**
2290  * gst_rtsp_stream_is_blocking:
2291  * @stream: a #GstRTSPStream
2292  *
2293  * Check if @stream is blocking on a #GstBuffer.
2294  *
2295  * Returns: %TRUE if @stream is blocking
2296  */
2297 gboolean
2298 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2299 {
2300   GstRTSPStreamPrivate *priv;
2301   gboolean result;
2302
2303   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2304
2305   priv = stream->priv;
2306
2307   g_mutex_lock (&priv->lock);
2308   result = priv->blocking;
2309   g_mutex_unlock (&priv->lock);
2310
2311   return result;
2312 }