Add initial support for RECORD
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include "rtsp-stream.h"
59
60 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
61      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
62
63 typedef struct
64 {
65   GstRTSPStreamTransport *transport;
66
67   /* RTP and RTCP source */
68   GstElement *udpsrc[2];
69   GstPad *selpad[2];
70 } GstRTSPMulticastTransportSource;
71
72 struct _GstRTSPStreamPrivate
73 {
74   GMutex lock;
75   guint idx;
76   /* Only one pad is ever set */
77   GstPad *srcpad, *sinkpad;
78   GstElement *payloader;
79   guint buffer_size;
80   gboolean is_joined;
81   gchar *control;
82
83   GstRTSPProfile profiles;
84   GstRTSPLowerTrans protocols;
85
86   /* pads on the rtpbin */
87   GstPad *send_rtp_sink;
88   GstPad *recv_rtp_src;
89   GstPad *recv_sink[2];
90   GstPad *send_src[2];
91
92   /* the RTPSession object */
93   GObject *session;
94
95   /* SRTP encoder/decoder */
96   GstElement *srtpenc;
97   GstElement *srtpdec;
98   GHashTable *keys;
99
100   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
101    * sockets */
102   GstElement *udpsrc_v4[2];
103
104   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
105    * sockets */
106   GstElement *udpsrc_v6[2];
107
108   GstElement *udpsink[2];
109
110   /* for TCP transport */
111   GstElement *appsrc[2];
112   GstElement *appqueue[2];
113   GstElement *appsink[2];
114
115   GstElement *tee[2];
116   GstElement *funnel[2];
117
118   /* retransmission */
119   GstElement *rtxsend;
120   guint rtx_pt;
121   GstClockTime rtx_time;
122
123   /* server ports for sending/receiving over ipv4 */
124   GstRTSPRange server_port_v4;
125   GstRTSPAddress *server_addr_v4;
126   gboolean have_ipv4;
127
128   /* server ports for sending/receiving over ipv6 */
129   GstRTSPRange server_port_v6;
130   GstRTSPAddress *server_addr_v6;
131   gboolean have_ipv6;
132
133   /* multicast addresses */
134   GstRTSPAddressPool *pool;
135   GstRTSPAddress *addr_v4;
136   GstRTSPAddress *addr_v6;
137
138   /* the caps of the stream */
139   gulong caps_sig;
140   GstCaps *caps;
141
142   /* transports we stream to */
143   guint n_active;
144   GList *transports;
145   guint transports_cookie;
146   GList *tr_cache_rtp;
147   GList *tr_cache_rtcp;
148   guint tr_cache_cookie_rtp;
149   guint tr_cache_cookie_rtcp;
150
151
152   /* UDP sources for UDP multicast transports */
153   GList *transport_sources;
154
155   gint dscp_qos;
156
157   /* stream blocking */
158   gulong blocked_id;
159   gboolean blocking;
160
161   /* pt->caps map for RECORD streams */
162   GHashTable *ptmap;
163 };
164
165 #define DEFAULT_CONTROL         NULL
166 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
167 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
168                                         GST_RTSP_LOWER_TRANS_TCP
169
170 enum
171 {
172   PROP_0,
173   PROP_CONTROL,
174   PROP_PROFILES,
175   PROP_PROTOCOLS,
176   PROP_LAST
177 };
178
179 enum
180 {
181   SIGNAL_NEW_RTP_ENCODER,
182   SIGNAL_NEW_RTCP_ENCODER,
183   SIGNAL_LAST
184 };
185
186 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
187 #define GST_CAT_DEFAULT rtsp_stream_debug
188
189 static GQuark ssrc_stream_map_key;
190
191 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
192     GValue * value, GParamSpec * pspec);
193 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
194     const GValue * value, GParamSpec * pspec);
195
196 static void gst_rtsp_stream_finalize (GObject * obj);
197
198 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
199
200 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
201
202 static void
203 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
204 {
205   GObjectClass *gobject_class;
206
207   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
208
209   gobject_class = G_OBJECT_CLASS (klass);
210
211   gobject_class->get_property = gst_rtsp_stream_get_property;
212   gobject_class->set_property = gst_rtsp_stream_set_property;
213   gobject_class->finalize = gst_rtsp_stream_finalize;
214
215   g_object_class_install_property (gobject_class, PROP_CONTROL,
216       g_param_spec_string ("control", "Control",
217           "The control string for this stream", DEFAULT_CONTROL,
218           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
219
220   g_object_class_install_property (gobject_class, PROP_PROFILES,
221       g_param_spec_flags ("profiles", "Profiles",
222           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
223           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
224
225   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
226       g_param_spec_flags ("protocols", "Protocols",
227           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
228           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
229
230   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
231       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
232       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
233       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
234
235   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
236       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
237       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
238       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
239
240   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
241
242   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
243 }
244
245 static void
246 gst_rtsp_stream_init (GstRTSPStream * stream)
247 {
248   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
249
250   GST_DEBUG ("new stream %p", stream);
251
252   stream->priv = priv;
253
254   priv->dscp_qos = -1;
255   priv->control = g_strdup (DEFAULT_CONTROL);
256   priv->profiles = DEFAULT_PROFILES;
257   priv->protocols = DEFAULT_PROTOCOLS;
258
259   g_mutex_init (&priv->lock);
260
261   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
262       NULL, (GDestroyNotify) gst_caps_unref);
263   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
264       (GDestroyNotify) gst_caps_unref);
265 }
266
267 static void
268 gst_rtsp_stream_finalize (GObject * obj)
269 {
270   GstRTSPStream *stream;
271   GstRTSPStreamPrivate *priv;
272
273   stream = GST_RTSP_STREAM (obj);
274   priv = stream->priv;
275
276   GST_DEBUG ("finalize stream %p", stream);
277
278   /* we really need to be unjoined now */
279   g_return_if_fail (!priv->is_joined);
280
281   if (priv->addr_v4)
282     gst_rtsp_address_free (priv->addr_v4);
283   if (priv->addr_v6)
284     gst_rtsp_address_free (priv->addr_v6);
285   if (priv->server_addr_v4)
286     gst_rtsp_address_free (priv->server_addr_v4);
287   if (priv->server_addr_v6)
288     gst_rtsp_address_free (priv->server_addr_v6);
289   if (priv->pool)
290     g_object_unref (priv->pool);
291   if (priv->rtxsend)
292     g_object_unref (priv->rtxsend);
293
294   gst_object_unref (priv->payloader);
295   if (priv->srcpad)
296     gst_object_unref (priv->srcpad);
297   if (priv->sinkpad)
298     gst_object_unref (priv->sinkpad);
299   g_free (priv->control);
300   g_mutex_clear (&priv->lock);
301
302   g_hash_table_unref (priv->keys);
303   g_hash_table_destroy (priv->ptmap);
304
305   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
306 }
307
308 static void
309 gst_rtsp_stream_get_property (GObject * object, guint propid,
310     GValue * value, GParamSpec * pspec)
311 {
312   GstRTSPStream *stream = GST_RTSP_STREAM (object);
313
314   switch (propid) {
315     case PROP_CONTROL:
316       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
317       break;
318     case PROP_PROFILES:
319       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
320       break;
321     case PROP_PROTOCOLS:
322       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
323       break;
324     default:
325       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
326   }
327 }
328
329 static void
330 gst_rtsp_stream_set_property (GObject * object, guint propid,
331     const GValue * value, GParamSpec * pspec)
332 {
333   GstRTSPStream *stream = GST_RTSP_STREAM (object);
334
335   switch (propid) {
336     case PROP_CONTROL:
337       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
338       break;
339     case PROP_PROFILES:
340       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
341       break;
342     case PROP_PROTOCOLS:
343       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
344       break;
345     default:
346       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
347   }
348 }
349
350 /**
351  * gst_rtsp_stream_new:
352  * @idx: an index
353  * @pad: a #GstPad
354  * @payloader: a #GstElement
355  *
356  * Create a new media stream with index @idx that handles RTP data on
357  * @pad and has a payloader element @payloader if @pad is a source pad
358  * or a depayloader element @payloader if @pad is a sink pad.
359  *
360  * Returns: (transfer full): a new #GstRTSPStream
361  */
362 GstRTSPStream *
363 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
364 {
365   GstRTSPStreamPrivate *priv;
366   GstRTSPStream *stream;
367
368   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
369   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
370
371   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
372   priv = stream->priv;
373   priv->idx = idx;
374   priv->payloader = gst_object_ref (payloader);
375   if (GST_PAD_IS_SRC (pad))
376     priv->srcpad = gst_object_ref (pad);
377   else
378     priv->sinkpad = gst_object_ref (pad);
379
380   return stream;
381 }
382
383 /**
384  * gst_rtsp_stream_get_index:
385  * @stream: a #GstRTSPStream
386  *
387  * Get the stream index.
388  *
389  * Return: the stream index.
390  */
391 guint
392 gst_rtsp_stream_get_index (GstRTSPStream * stream)
393 {
394   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
395
396   return stream->priv->idx;
397 }
398
399 /**
400  * gst_rtsp_stream_get_pt:
401  * @stream: a #GstRTSPStream
402  *
403  * Get the stream payload type.
404  *
405  * Return: the stream payload type.
406  */
407 guint
408 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
409 {
410   GstRTSPStreamPrivate *priv;
411   guint pt;
412
413   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
414
415   priv = stream->priv;
416
417   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
418
419   return pt;
420 }
421
422 /**
423  * gst_rtsp_stream_get_srcpad:
424  * @stream: a #GstRTSPStream
425  *
426  * Get the srcpad associated with @stream.
427  *
428  * Returns: (transfer full): the srcpad. Unref after usage.
429  */
430 GstPad *
431 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
432 {
433   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
434
435   if (!stream->priv->srcpad)
436     return NULL;
437
438   return gst_object_ref (stream->priv->srcpad);
439 }
440
441 /**
442  * gst_rtsp_stream_get_sinkpad:
443  * @stream: a #GstRTSPStream
444  *
445  * Get the sinkpad associated with @stream.
446  *
447  * Returns: (transfer full): the sinkpad. Unref after usage.
448  */
449 GstPad *
450 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
451 {
452   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
453
454   if (!stream->priv->sinkpad)
455     return NULL;
456
457   return gst_object_ref (stream->priv->sinkpad);
458 }
459
460 /**
461  * gst_rtsp_stream_get_control:
462  * @stream: a #GstRTSPStream
463  *
464  * Get the control string to identify this stream.
465  *
466  * Returns: (transfer full): the control string. g_free() after usage.
467  */
468 gchar *
469 gst_rtsp_stream_get_control (GstRTSPStream * stream)
470 {
471   GstRTSPStreamPrivate *priv;
472   gchar *result;
473
474   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
475
476   priv = stream->priv;
477
478   g_mutex_lock (&priv->lock);
479   if ((result = g_strdup (priv->control)) == NULL)
480     result = g_strdup_printf ("stream=%u", priv->idx);
481   g_mutex_unlock (&priv->lock);
482
483   return result;
484 }
485
486 /**
487  * gst_rtsp_stream_set_control:
488  * @stream: a #GstRTSPStream
489  * @control: a control string
490  *
491  * Set the control string in @stream.
492  */
493 void
494 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
495 {
496   GstRTSPStreamPrivate *priv;
497
498   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
499
500   priv = stream->priv;
501
502   g_mutex_lock (&priv->lock);
503   g_free (priv->control);
504   priv->control = g_strdup (control);
505   g_mutex_unlock (&priv->lock);
506 }
507
508 /**
509  * gst_rtsp_stream_has_control:
510  * @stream: a #GstRTSPStream
511  * @control: a control string
512  *
513  * Check if @stream has the control string @control.
514  *
515  * Returns: %TRUE is @stream has @control as the control string
516  */
517 gboolean
518 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
519 {
520   GstRTSPStreamPrivate *priv;
521   gboolean res;
522
523   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
524
525   priv = stream->priv;
526
527   g_mutex_lock (&priv->lock);
528   if (priv->control)
529     res = (g_strcmp0 (priv->control, control) == 0);
530   else {
531     guint streamid;
532
533     if (sscanf (control, "stream=%u", &streamid) > 0)
534       res = (streamid == priv->idx);
535     else
536       res = FALSE;
537   }
538   g_mutex_unlock (&priv->lock);
539
540   return res;
541 }
542
543 /**
544  * gst_rtsp_stream_set_mtu:
545  * @stream: a #GstRTSPStream
546  * @mtu: a new MTU
547  *
548  * Configure the mtu in the payloader of @stream to @mtu.
549  */
550 void
551 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
552 {
553   GstRTSPStreamPrivate *priv;
554
555   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
556
557   priv = stream->priv;
558
559   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
560
561   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
562 }
563
564 /**
565  * gst_rtsp_stream_get_mtu:
566  * @stream: a #GstRTSPStream
567  *
568  * Get the configured MTU in the payloader of @stream.
569  *
570  * Returns: the MTU of the payloader.
571  */
572 guint
573 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
574 {
575   GstRTSPStreamPrivate *priv;
576   guint mtu;
577
578   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
579
580   priv = stream->priv;
581
582   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
583
584   return mtu;
585 }
586
587 /* Update the dscp qos property on the udp sinks */
588 static void
589 update_dscp_qos (GstRTSPStream * stream)
590 {
591   GstRTSPStreamPrivate *priv;
592
593   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
594
595   priv = stream->priv;
596
597   if (priv->udpsink[0]) {
598     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
599         NULL);
600   }
601
602   if (priv->udpsink[1]) {
603     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
604         NULL);
605   }
606 }
607
608 /**
609  * gst_rtsp_stream_set_dscp_qos:
610  * @stream: a #GstRTSPStream
611  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
612  *
613  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
614  */
615 void
616 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
617 {
618   GstRTSPStreamPrivate *priv;
619
620   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
621
622   priv = stream->priv;
623
624   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
625
626   if (dscp_qos < -1 || dscp_qos > 63) {
627     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
628     return;
629   }
630
631   priv->dscp_qos = dscp_qos;
632
633   update_dscp_qos (stream);
634 }
635
636 /**
637  * gst_rtsp_stream_get_dscp_qos:
638  * @stream: a #GstRTSPStream
639  *
640  * Get the configured DSCP QoS in of the outgoing sockets.
641  *
642  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
643  */
644 gint
645 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
646 {
647   GstRTSPStreamPrivate *priv;
648
649   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
650
651   priv = stream->priv;
652
653   return priv->dscp_qos;
654 }
655
656 /**
657  * gst_rtsp_stream_is_transport_supported:
658  * @stream: a #GstRTSPStream
659  * @transport: (transfer none): a #GstRTSPTransport
660  *
661  * Check if @transport can be handled by stream
662  *
663  * Returns: %TRUE if @transport can be handled by @stream.
664  */
665 gboolean
666 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
667     GstRTSPTransport * transport)
668 {
669   GstRTSPStreamPrivate *priv;
670
671   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
672
673   priv = stream->priv;
674
675   g_mutex_lock (&priv->lock);
676   if (transport->trans != GST_RTSP_TRANS_RTP)
677     goto unsupported_transmode;
678
679   if (!(transport->profile & priv->profiles))
680     goto unsupported_profile;
681
682   if (!(transport->lower_transport & priv->protocols))
683     goto unsupported_ltrans;
684
685   g_mutex_unlock (&priv->lock);
686
687   return TRUE;
688
689   /* ERRORS */
690 unsupported_transmode:
691   {
692     GST_DEBUG ("unsupported transport mode %d", transport->trans);
693     g_mutex_unlock (&priv->lock);
694     return FALSE;
695   }
696 unsupported_profile:
697   {
698     GST_DEBUG ("unsupported profile %d", transport->profile);
699     g_mutex_unlock (&priv->lock);
700     return FALSE;
701   }
702 unsupported_ltrans:
703   {
704     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
705     g_mutex_unlock (&priv->lock);
706     return FALSE;
707   }
708 }
709
710 /**
711  * gst_rtsp_stream_set_profiles:
712  * @stream: a #GstRTSPStream
713  * @profiles: the new profiles
714  *
715  * Configure the allowed profiles for @stream.
716  */
717 void
718 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
719 {
720   GstRTSPStreamPrivate *priv;
721
722   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
723
724   priv = stream->priv;
725
726   g_mutex_lock (&priv->lock);
727   priv->profiles = profiles;
728   g_mutex_unlock (&priv->lock);
729 }
730
731 /**
732  * gst_rtsp_stream_get_profiles:
733  * @stream: a #GstRTSPStream
734  *
735  * Get the allowed profiles of @stream.
736  *
737  * Returns: a #GstRTSPProfile
738  */
739 GstRTSPProfile
740 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
741 {
742   GstRTSPStreamPrivate *priv;
743   GstRTSPProfile res;
744
745   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
746
747   priv = stream->priv;
748
749   g_mutex_lock (&priv->lock);
750   res = priv->profiles;
751   g_mutex_unlock (&priv->lock);
752
753   return res;
754 }
755
756 /**
757  * gst_rtsp_stream_set_protocols:
758  * @stream: a #GstRTSPStream
759  * @protocols: the new flags
760  *
761  * Configure the allowed lower transport for @stream.
762  */
763 void
764 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
765     GstRTSPLowerTrans protocols)
766 {
767   GstRTSPStreamPrivate *priv;
768
769   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
770
771   priv = stream->priv;
772
773   g_mutex_lock (&priv->lock);
774   priv->protocols = protocols;
775   g_mutex_unlock (&priv->lock);
776 }
777
778 /**
779  * gst_rtsp_stream_get_protocols:
780  * @stream: a #GstRTSPStream
781  *
782  * Get the allowed protocols of @stream.
783  *
784  * Returns: a #GstRTSPLowerTrans
785  */
786 GstRTSPLowerTrans
787 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
788 {
789   GstRTSPStreamPrivate *priv;
790   GstRTSPLowerTrans res;
791
792   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
793       GST_RTSP_LOWER_TRANS_UNKNOWN);
794
795   priv = stream->priv;
796
797   g_mutex_lock (&priv->lock);
798   res = priv->protocols;
799   g_mutex_unlock (&priv->lock);
800
801   return res;
802 }
803
804 /**
805  * gst_rtsp_stream_set_address_pool:
806  * @stream: a #GstRTSPStream
807  * @pool: (transfer none): a #GstRTSPAddressPool
808  *
809  * configure @pool to be used as the address pool of @stream.
810  */
811 void
812 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
813     GstRTSPAddressPool * pool)
814 {
815   GstRTSPStreamPrivate *priv;
816   GstRTSPAddressPool *old;
817
818   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
819
820   priv = stream->priv;
821
822   GST_LOG_OBJECT (stream, "set address pool %p", pool);
823
824   g_mutex_lock (&priv->lock);
825   if ((old = priv->pool) != pool)
826     priv->pool = pool ? g_object_ref (pool) : NULL;
827   else
828     old = NULL;
829   g_mutex_unlock (&priv->lock);
830
831   if (old)
832     g_object_unref (old);
833 }
834
835 /**
836  * gst_rtsp_stream_get_address_pool:
837  * @stream: a #GstRTSPStream
838  *
839  * Get the #GstRTSPAddressPool used as the address pool of @stream.
840  *
841  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
842  * usage.
843  */
844 GstRTSPAddressPool *
845 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
846 {
847   GstRTSPStreamPrivate *priv;
848   GstRTSPAddressPool *result;
849
850   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
851
852   priv = stream->priv;
853
854   g_mutex_lock (&priv->lock);
855   if ((result = priv->pool))
856     g_object_ref (result);
857   g_mutex_unlock (&priv->lock);
858
859   return result;
860 }
861
862 /**
863  * gst_rtsp_stream_get_multicast_address:
864  * @stream: a #GstRTSPStream
865  * @family: the #GSocketFamily
866  *
867  * Get the multicast address of @stream for @family.
868  *
869  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
870  * or %NULL when no address could be allocated. gst_rtsp_address_free()
871  * after usage.
872  */
873 GstRTSPAddress *
874 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
875     GSocketFamily family)
876 {
877   GstRTSPStreamPrivate *priv;
878   GstRTSPAddress *result;
879   GstRTSPAddress **addrp;
880   GstRTSPAddressFlags flags;
881
882   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
883
884   priv = stream->priv;
885
886   if (family == G_SOCKET_FAMILY_IPV6) {
887     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
888     addrp = &priv->addr_v6;
889   } else {
890     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
891     addrp = &priv->addr_v4;
892   }
893
894   g_mutex_lock (&priv->lock);
895   if (*addrp == NULL) {
896     if (priv->pool == NULL)
897       goto no_pool;
898
899     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
900
901     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
902     if (*addrp == NULL)
903       goto no_address;
904   }
905   result = gst_rtsp_address_copy (*addrp);
906   g_mutex_unlock (&priv->lock);
907
908   return result;
909
910   /* ERRORS */
911 no_pool:
912   {
913     GST_ERROR_OBJECT (stream, "no address pool specified");
914     g_mutex_unlock (&priv->lock);
915     return NULL;
916   }
917 no_address:
918   {
919     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
920     g_mutex_unlock (&priv->lock);
921     return NULL;
922   }
923 }
924
925 /**
926  * gst_rtsp_stream_reserve_address:
927  * @stream: a #GstRTSPStream
928  * @address: an address
929  * @port: a port
930  * @n_ports: n_ports
931  * @ttl: a TTL
932  *
933  * Reserve @address and @port as the address and port of @stream.
934  *
935  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
936  * the address could be reserved. gst_rtsp_address_free() after usage.
937  */
938 GstRTSPAddress *
939 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
940     const gchar * address, guint port, guint n_ports, guint ttl)
941 {
942   GstRTSPStreamPrivate *priv;
943   GstRTSPAddress *result;
944   GInetAddress *addr;
945   GSocketFamily family;
946   GstRTSPAddress **addrp;
947
948   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
949   g_return_val_if_fail (address != NULL, NULL);
950   g_return_val_if_fail (port > 0, NULL);
951   g_return_val_if_fail (n_ports > 0, NULL);
952   g_return_val_if_fail (ttl > 0, NULL);
953
954   priv = stream->priv;
955
956   addr = g_inet_address_new_from_string (address);
957   if (!addr) {
958     GST_ERROR ("failed to get inet addr from %s", address);
959     family = G_SOCKET_FAMILY_IPV4;
960   } else {
961     family = g_inet_address_get_family (addr);
962     g_object_unref (addr);
963   }
964
965   if (family == G_SOCKET_FAMILY_IPV6)
966     addrp = &priv->addr_v6;
967   else
968     addrp = &priv->addr_v4;
969
970   g_mutex_lock (&priv->lock);
971   if (*addrp == NULL) {
972     GstRTSPAddressPoolResult res;
973
974     if (priv->pool == NULL)
975       goto no_pool;
976
977     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
978         port, n_ports, ttl, addrp);
979     if (res != GST_RTSP_ADDRESS_POOL_OK)
980       goto no_address;
981   } else {
982     if (strcmp ((*addrp)->address, address) ||
983         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
984         (*addrp)->ttl != ttl)
985       goto different_address;
986   }
987   result = gst_rtsp_address_copy (*addrp);
988   g_mutex_unlock (&priv->lock);
989
990   return result;
991
992   /* ERRORS */
993 no_pool:
994   {
995     GST_ERROR_OBJECT (stream, "no address pool specified");
996     g_mutex_unlock (&priv->lock);
997     return NULL;
998   }
999 no_address:
1000   {
1001     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1002         address);
1003     g_mutex_unlock (&priv->lock);
1004     return NULL;
1005   }
1006 different_address:
1007   {
1008     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1009         " reserved", address);
1010     g_mutex_unlock (&priv->lock);
1011     return NULL;
1012   }
1013 }
1014
1015 static gboolean
1016 alloc_ports_one_family (GstRTSPStream * stream, GstRTSPAddressPool * pool,
1017     gint buffer_size, GSocketFamily family, GstElement * udpsrc_out[2],
1018     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
1019     GstRTSPAddress ** server_addr_out)
1020 {
1021   GstRTSPStreamPrivate *priv = stream->priv;
1022   GstStateChangeReturn ret;
1023   GstElement *udpsrc0, *udpsrc1;
1024   GstElement *udpsink0, *udpsink1;
1025   GSocket *rtp_socket = NULL;
1026   GSocket *rtcp_socket;
1027   gint tmp_rtp, tmp_rtcp;
1028   guint count;
1029   gint rtpport, rtcpport;
1030   GList *rejected_addresses = NULL;
1031   GstRTSPAddress *addr = NULL;
1032   GInetAddress *inetaddr = NULL;
1033   GSocketAddress *rtp_sockaddr = NULL;
1034   GSocketAddress *rtcp_sockaddr = NULL;
1035   const gchar *multisink_socket;
1036
1037   if (family == G_SOCKET_FAMILY_IPV6)
1038     multisink_socket = "socket-v6";
1039   else
1040     multisink_socket = "socket";
1041
1042   udpsrc0 = NULL;
1043   udpsrc1 = NULL;
1044   udpsink0 = NULL;
1045   udpsink1 = NULL;
1046   count = 0;
1047
1048   /* Start with random port */
1049   tmp_rtp = 0;
1050
1051   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1052       G_SOCKET_PROTOCOL_UDP, NULL);
1053   if (!rtcp_socket)
1054     goto no_udp_protocol;
1055
1056   if (*server_addr_out)
1057     gst_rtsp_address_free (*server_addr_out);
1058
1059   /* try to allocate 2 UDP ports, the RTP port should be an even
1060    * number and the RTCP port should be the next (uneven) port */
1061 again:
1062
1063   if (rtp_socket == NULL) {
1064     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1065         G_SOCKET_PROTOCOL_UDP, NULL);
1066     if (!rtp_socket)
1067       goto no_udp_protocol;
1068   }
1069
1070   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1071     GstRTSPAddressFlags flags;
1072
1073     if (addr)
1074       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1075
1076     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1077     if (family == G_SOCKET_FAMILY_IPV6)
1078       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1079     else
1080       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1081
1082     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1083
1084     if (addr == NULL)
1085       goto no_ports;
1086
1087     tmp_rtp = addr->port;
1088
1089     g_clear_object (&inetaddr);
1090     inetaddr = g_inet_address_new_from_string (addr->address);
1091   } else {
1092     if (tmp_rtp != 0) {
1093       tmp_rtp += 2;
1094       if (++count > 20)
1095         goto no_ports;
1096     }
1097
1098     if (inetaddr == NULL)
1099       inetaddr = g_inet_address_new_any (family);
1100   }
1101
1102   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1103   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1104     g_object_unref (rtp_sockaddr);
1105     goto again;
1106   }
1107   g_object_unref (rtp_sockaddr);
1108
1109   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1110   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1111     g_clear_object (&rtp_sockaddr);
1112     goto socket_error;
1113   }
1114
1115   tmp_rtp =
1116       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1117   g_object_unref (rtp_sockaddr);
1118
1119   /* check if port is even */
1120   if ((tmp_rtp & 1) != 0) {
1121     /* port not even, close and allocate another */
1122     tmp_rtp++;
1123     g_clear_object (&rtp_socket);
1124     goto again;
1125   }
1126
1127   /* set port */
1128   tmp_rtcp = tmp_rtp + 1;
1129
1130   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1131   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1132     g_object_unref (rtcp_sockaddr);
1133     g_clear_object (&rtp_socket);
1134     goto again;
1135   }
1136   g_object_unref (rtcp_sockaddr);
1137
1138   g_clear_object (&inetaddr);
1139
1140   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1141   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1142
1143   if (udpsrc0 == NULL || udpsrc1 == NULL)
1144     goto no_udp_protocol;
1145
1146   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1147   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1148
1149   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1150   if (ret == GST_STATE_CHANGE_FAILURE)
1151     goto element_error;
1152   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1153   if (ret == GST_STATE_CHANGE_FAILURE)
1154     goto element_error;
1155
1156   /* all fine, do port check */
1157   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1158   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1159
1160   /* this should not happen... */
1161   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1162     goto port_error;
1163
1164   if (udpsink_out[0])
1165     udpsink0 = udpsink_out[0];
1166   else
1167     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1168
1169   if (!udpsink0)
1170     goto no_udp_protocol;
1171
1172   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1173   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1174
1175   if (udpsink_out[1])
1176     udpsink1 = udpsink_out[1];
1177   else
1178     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1179
1180   if (!udpsink1)
1181     goto no_udp_protocol;
1182
1183   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1184   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1185   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1186
1187   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1188   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1189   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1190   /* Needs to be async for RECORD streams, otherwise we will never go to
1191    * PLAYING because the sinks will wait for data while the udpsrc can't
1192    * provide data with timestamps in PAUSED. */
1193   if (priv->sinkpad)
1194     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1195   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1196   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1197   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1198   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1199   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1200
1201   /* we keep these elements, we will further configure them when the
1202    * client told us to really use the UDP ports. */
1203   udpsrc_out[0] = udpsrc0;
1204   udpsrc_out[1] = udpsrc1;
1205   udpsink_out[0] = udpsink0;
1206   udpsink_out[1] = udpsink1;
1207
1208   server_port_out->min = rtpport;
1209   server_port_out->max = rtcpport;
1210
1211   *server_addr_out = addr;
1212   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1213
1214   g_object_unref (rtp_socket);
1215   g_object_unref (rtcp_socket);
1216
1217   return TRUE;
1218
1219   /* ERRORS */
1220 no_udp_protocol:
1221   {
1222     goto cleanup;
1223   }
1224 no_ports:
1225   {
1226     goto cleanup;
1227   }
1228 port_error:
1229   {
1230     goto cleanup;
1231   }
1232 socket_error:
1233   {
1234     goto cleanup;
1235   }
1236 element_error:
1237   {
1238     goto cleanup;
1239   }
1240 cleanup:
1241   {
1242     if (udpsrc0) {
1243       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1244       gst_object_unref (udpsrc0);
1245     }
1246     if (udpsrc1) {
1247       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1248       gst_object_unref (udpsrc1);
1249     }
1250     if (udpsink0) {
1251       gst_element_set_state (udpsink0, GST_STATE_NULL);
1252       gst_object_unref (udpsink0);
1253     }
1254     if (inetaddr)
1255       g_object_unref (inetaddr);
1256     g_list_free_full (rejected_addresses,
1257         (GDestroyNotify) gst_rtsp_address_free);
1258     if (addr)
1259       gst_rtsp_address_free (addr);
1260     if (rtp_socket)
1261       g_object_unref (rtp_socket);
1262     if (rtcp_socket)
1263       g_object_unref (rtcp_socket);
1264     return FALSE;
1265   }
1266 }
1267
1268 /* must be called with lock */
1269 static gboolean
1270 alloc_ports (GstRTSPStream * stream)
1271 {
1272   GstRTSPStreamPrivate *priv = stream->priv;
1273
1274   priv->have_ipv4 =
1275       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1276       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1277       &priv->server_port_v4, &priv->server_addr_v4);
1278
1279   priv->have_ipv6 =
1280       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1281       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1282       &priv->server_port_v6, &priv->server_addr_v6);
1283
1284   return priv->have_ipv4 || priv->have_ipv6;
1285 }
1286
1287 /**
1288  * gst_rtsp_stream_get_server_port:
1289  * @stream: a #GstRTSPStream
1290  * @server_port: (out): result server port
1291  * @family: the port family to get
1292  *
1293  * Fill @server_port with the port pair used by the server. This function can
1294  * only be called when @stream has been joined.
1295  */
1296 void
1297 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1298     GstRTSPRange * server_port, GSocketFamily family)
1299 {
1300   GstRTSPStreamPrivate *priv;
1301
1302   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1303   priv = stream->priv;
1304   g_return_if_fail (priv->is_joined);
1305
1306   g_mutex_lock (&priv->lock);
1307   if (family == G_SOCKET_FAMILY_IPV4) {
1308     if (server_port)
1309       *server_port = priv->server_port_v4;
1310   } else {
1311     if (server_port)
1312       *server_port = priv->server_port_v6;
1313   }
1314   g_mutex_unlock (&priv->lock);
1315 }
1316
1317 /**
1318  * gst_rtsp_stream_get_rtpsession:
1319  * @stream: a #GstRTSPStream
1320  *
1321  * Get the RTP session of this stream.
1322  *
1323  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1324  */
1325 GObject *
1326 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1327 {
1328   GstRTSPStreamPrivate *priv;
1329   GObject *session;
1330
1331   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1332
1333   priv = stream->priv;
1334
1335   g_mutex_lock (&priv->lock);
1336   if ((session = priv->session))
1337     g_object_ref (session);
1338   g_mutex_unlock (&priv->lock);
1339
1340   return session;
1341 }
1342
1343 /**
1344  * gst_rtsp_stream_get_ssrc:
1345  * @stream: a #GstRTSPStream
1346  * @ssrc: (out): result ssrc
1347  *
1348  * Get the SSRC used by the RTP session of this stream. This function can only
1349  * be called when @stream has been joined.
1350  */
1351 void
1352 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1353 {
1354   GstRTSPStreamPrivate *priv;
1355
1356   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1357   priv = stream->priv;
1358   g_return_if_fail (priv->is_joined);
1359
1360   g_mutex_lock (&priv->lock);
1361   if (ssrc && priv->session)
1362     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1363   g_mutex_unlock (&priv->lock);
1364 }
1365
1366 /**
1367  * gst_rtsp_stream_set_retransmission_time:
1368  * @stream: a #GstRTSPStream
1369  * @time: a #GstClockTime
1370  *
1371  * Set the amount of time to store retransmission packets.
1372  */
1373 void
1374 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1375     GstClockTime time)
1376 {
1377   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1378
1379   g_mutex_lock (&stream->priv->lock);
1380   stream->priv->rtx_time = time;
1381   if (stream->priv->rtxsend)
1382     g_object_set (stream->priv->rtxsend, "max-size-time",
1383         GST_TIME_AS_MSECONDS (time), NULL);
1384   g_mutex_unlock (&stream->priv->lock);
1385 }
1386
1387 /**
1388  * gst_rtsp_media_get_retransmission_time:
1389  * @media: a #GstRTSPMedia
1390  *
1391  * Get the amount of time to store retransmission data.
1392  *
1393  * Returns: the amount of time to store retransmission data.
1394  */
1395 GstClockTime
1396 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1397 {
1398   GstClockTime ret;
1399
1400   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1401
1402   g_mutex_lock (&stream->priv->lock);
1403   ret = stream->priv->rtx_time;
1404   g_mutex_unlock (&stream->priv->lock);
1405
1406   return ret;
1407 }
1408
1409 void
1410 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1411 {
1412   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1413
1414   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1415
1416   g_mutex_lock (&stream->priv->lock);
1417   stream->priv->rtx_pt = rtx_pt;
1418   if (stream->priv->rtxsend) {
1419     guint pt = gst_rtsp_stream_get_pt (stream);
1420     gchar *pt_s = g_strdup_printf ("%d", pt);
1421     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1422         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1423     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1424     g_free (pt_s);
1425     gst_structure_free (rtx_pt_map);
1426   }
1427   g_mutex_unlock (&stream->priv->lock);
1428 }
1429
1430 guint
1431 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1432 {
1433   guint rtx_pt;
1434
1435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1436
1437   g_mutex_lock (&stream->priv->lock);
1438   rtx_pt = stream->priv->rtx_pt;
1439   g_mutex_unlock (&stream->priv->lock);
1440
1441   return rtx_pt;
1442 }
1443
1444 /* executed from streaming thread */
1445 static void
1446 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1447 {
1448   GstRTSPStreamPrivate *priv = stream->priv;
1449   GstCaps *newcaps, *oldcaps;
1450
1451   newcaps = gst_pad_get_current_caps (pad);
1452
1453   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1454       newcaps);
1455
1456   g_mutex_lock (&priv->lock);
1457   oldcaps = priv->caps;
1458   priv->caps = newcaps;
1459   g_mutex_unlock (&priv->lock);
1460
1461   if (oldcaps)
1462     gst_caps_unref (oldcaps);
1463 }
1464
1465 static void
1466 dump_structure (const GstStructure * s)
1467 {
1468   gchar *sstr;
1469
1470   sstr = gst_structure_to_string (s);
1471   GST_INFO ("structure: %s", sstr);
1472   g_free (sstr);
1473 }
1474
1475 static GstRTSPStreamTransport *
1476 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1477 {
1478   GstRTSPStreamPrivate *priv = stream->priv;
1479   GList *walk;
1480   GstRTSPStreamTransport *result = NULL;
1481   const gchar *tmp;
1482   gchar *dest;
1483   guint port;
1484
1485   if (rtcp_from == NULL)
1486     return NULL;
1487
1488   tmp = g_strrstr (rtcp_from, ":");
1489   if (tmp == NULL)
1490     return NULL;
1491
1492   port = atoi (tmp + 1);
1493   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1494
1495   g_mutex_lock (&priv->lock);
1496   GST_INFO ("finding %s:%d in %d transports", dest, port,
1497       g_list_length (priv->transports));
1498
1499   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1500     GstRTSPStreamTransport *trans = walk->data;
1501     const GstRTSPTransport *tr;
1502     gint min, max;
1503
1504     tr = gst_rtsp_stream_transport_get_transport (trans);
1505
1506     min = tr->client_port.min;
1507     max = tr->client_port.max;
1508
1509     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1510       result = trans;
1511       break;
1512     }
1513   }
1514   if (result)
1515     g_object_ref (result);
1516   g_mutex_unlock (&priv->lock);
1517
1518   g_free (dest);
1519
1520   return result;
1521 }
1522
1523 static GstRTSPStreamTransport *
1524 check_transport (GObject * source, GstRTSPStream * stream)
1525 {
1526   GstStructure *stats;
1527   GstRTSPStreamTransport *trans;
1528
1529   /* see if we have a stream to match with the origin of the RTCP packet */
1530   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1531   if (trans == NULL) {
1532     g_object_get (source, "stats", &stats, NULL);
1533     if (stats) {
1534       const gchar *rtcp_from;
1535
1536       dump_structure (stats);
1537
1538       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1539       if ((trans = find_transport (stream, rtcp_from))) {
1540         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1541             source);
1542         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1543             g_object_unref);
1544       }
1545       gst_structure_free (stats);
1546     }
1547   }
1548   return trans;
1549 }
1550
1551
1552 static void
1553 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1554 {
1555   GstRTSPStreamTransport *trans;
1556
1557   GST_INFO ("%p: new source %p", stream, source);
1558
1559   trans = check_transport (source, stream);
1560
1561   if (trans)
1562     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1563 }
1564
1565 static void
1566 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1567 {
1568   GST_INFO ("%p: new SDES %p", stream, source);
1569 }
1570
1571 static void
1572 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1573 {
1574   GstRTSPStreamTransport *trans;
1575
1576   trans = check_transport (source, stream);
1577
1578   if (trans) {
1579     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1580     gst_rtsp_stream_transport_keep_alive (trans);
1581   }
1582 #ifdef DUMP_STATS
1583   {
1584     GstStructure *stats;
1585     g_object_get (source, "stats", &stats, NULL);
1586     if (stats) {
1587       dump_structure (stats);
1588       gst_structure_free (stats);
1589     }
1590   }
1591 #endif
1592 }
1593
1594 static void
1595 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1596 {
1597   GST_INFO ("%p: source %p bye", stream, source);
1598 }
1599
1600 static void
1601 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1602 {
1603   GstRTSPStreamTransport *trans;
1604
1605   GST_INFO ("%p: source %p bye timeout", stream, source);
1606
1607   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1608     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1609     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1610   }
1611 }
1612
1613 static void
1614 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1615 {
1616   GstRTSPStreamTransport *trans;
1617
1618   GST_INFO ("%p: source %p timeout", stream, source);
1619
1620   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1621     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1622     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1623   }
1624 }
1625
1626 static void
1627 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1628 {
1629   if (is_rtp) {
1630     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1631     g_list_free (priv->tr_cache_rtp);
1632     priv->tr_cache_rtp = NULL;
1633   } else {
1634     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1635     g_list_free (priv->tr_cache_rtcp);
1636     priv->tr_cache_rtcp = NULL;
1637   }
1638 }
1639
1640 static GstFlowReturn
1641 handle_new_sample (GstAppSink * sink, gpointer user_data)
1642 {
1643   GstRTSPStreamPrivate *priv;
1644   GList *walk;
1645   GstSample *sample;
1646   GstBuffer *buffer;
1647   GstRTSPStream *stream;
1648   gboolean is_rtp;
1649
1650   sample = gst_app_sink_pull_sample (sink);
1651   if (!sample)
1652     return GST_FLOW_OK;
1653
1654   stream = (GstRTSPStream *) user_data;
1655   priv = stream->priv;
1656   buffer = gst_sample_get_buffer (sample);
1657
1658   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1659
1660   g_mutex_lock (&priv->lock);
1661   if (is_rtp) {
1662     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1663       clear_tr_cache (priv, is_rtp);
1664       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1665         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1666         priv->tr_cache_rtp =
1667             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1668       }
1669       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1670     }
1671   } else {
1672     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1673       clear_tr_cache (priv, is_rtp);
1674       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1675         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1676         priv->tr_cache_rtcp =
1677             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1678       }
1679       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1680     }
1681   }
1682   g_mutex_unlock (&priv->lock);
1683
1684   if (is_rtp) {
1685     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1686       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1687       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1688     }
1689   } else {
1690     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1691       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1692       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1693     }
1694   }
1695   gst_sample_unref (sample);
1696
1697   return GST_FLOW_OK;
1698 }
1699
1700 static GstAppSinkCallbacks sink_cb = {
1701   NULL,                         /* not interested in EOS */
1702   NULL,                         /* not interested in preroll samples */
1703   handle_new_sample,
1704 };
1705
1706 static GstElement *
1707 get_rtp_encoder (GstRTSPStream * stream, guint session)
1708 {
1709   GstRTSPStreamPrivate *priv = stream->priv;
1710
1711   if (priv->srtpenc == NULL) {
1712     gchar *name;
1713
1714     name = g_strdup_printf ("srtpenc_%u", session);
1715     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1716     g_free (name);
1717
1718     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1719   }
1720   return gst_object_ref (priv->srtpenc);
1721 }
1722
1723 static GstElement *
1724 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1725 {
1726   GstRTSPStreamPrivate *priv = stream->priv;
1727   GstElement *oldenc, *enc;
1728   GstPad *pad;
1729   gchar *name;
1730
1731   if (priv->idx != session)
1732     return NULL;
1733
1734   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1735
1736   oldenc = priv->srtpenc;
1737   enc = get_rtp_encoder (stream, session);
1738   name = g_strdup_printf ("rtp_sink_%d", session);
1739   pad = gst_element_get_request_pad (enc, name);
1740   g_free (name);
1741   gst_object_unref (pad);
1742
1743   if (oldenc == NULL)
1744     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1745         enc);
1746
1747   return enc;
1748 }
1749
1750 static GstElement *
1751 request_rtcp_encoder (GstElement * rtpbin, guint session,
1752     GstRTSPStream * stream)
1753 {
1754   GstRTSPStreamPrivate *priv = stream->priv;
1755   GstElement *oldenc, *enc;
1756   GstPad *pad;
1757   gchar *name;
1758
1759   if (priv->idx != session)
1760     return NULL;
1761
1762   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1763
1764   oldenc = priv->srtpenc;
1765   enc = get_rtp_encoder (stream, session);
1766   name = g_strdup_printf ("rtcp_sink_%d", session);
1767   pad = gst_element_get_request_pad (enc, name);
1768   g_free (name);
1769   gst_object_unref (pad);
1770
1771   if (oldenc == NULL)
1772     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1773         enc);
1774
1775   return enc;
1776 }
1777
1778 static GstCaps *
1779 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1780 {
1781   GstRTSPStreamPrivate *priv = stream->priv;
1782   GstCaps *caps;
1783
1784   GST_DEBUG ("request key %08x", ssrc);
1785
1786   g_mutex_lock (&priv->lock);
1787   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1788     gst_caps_ref (caps);
1789   g_mutex_unlock (&priv->lock);
1790
1791   return caps;
1792 }
1793
1794 static GstElement *
1795 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1796     GstRTSPStream * stream)
1797 {
1798   GstRTSPStreamPrivate *priv = stream->priv;
1799
1800   if (priv->idx != session)
1801     return NULL;
1802
1803   if (priv->srtpdec == NULL) {
1804     gchar *name;
1805
1806     name = g_strdup_printf ("srtpdec_%u", session);
1807     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1808     g_free (name);
1809
1810     g_signal_connect (priv->srtpdec, "request-key",
1811         (GCallback) request_key, stream);
1812   }
1813   return gst_object_ref (priv->srtpdec);
1814 }
1815
1816 static GstElement *
1817 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
1818 {
1819   GstElement *bin;
1820   GstPad *pad;
1821   GstStructure *pt_map;
1822   gchar *name;
1823   guint pt, rtx_pt;
1824   gchar *pt_s;
1825
1826   pt = gst_rtsp_stream_get_pt (stream);
1827   pt_s = g_strdup_printf ("%u", pt);
1828   rtx_pt = stream->priv->rtx_pt;
1829
1830   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1831
1832   bin = gst_bin_new (NULL);
1833   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1834   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1835       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1836   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1837       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1838   g_free (pt_s);
1839   gst_structure_free (pt_map);
1840   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1841
1842   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1843   name = g_strdup_printf ("src_%u", sessid);
1844   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1845   g_free (name);
1846   gst_object_unref (pad);
1847
1848   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1849   name = g_strdup_printf ("sink_%u", sessid);
1850   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1851   g_free (name);
1852   gst_object_unref (pad);
1853
1854   return bin;
1855 }
1856
1857 /**
1858  * gst_rtsp_stream_set_pt_map:
1859  * @stream: a #GstRTSPStream
1860  * @pt: the pt
1861  * @caps: a #GstCaps
1862  *
1863  * Configure a pt map between @pt and @caps.
1864  */
1865 void
1866 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
1867 {
1868   GstRTSPStreamPrivate *priv = stream->priv;
1869
1870   g_mutex_lock (&priv->lock);
1871   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
1872   g_mutex_unlock (&priv->lock);
1873 }
1874
1875 static GstCaps *
1876 request_pt_map (GstElement * rtpbin, guint session, guint pt,
1877     GstRTSPStream * stream)
1878 {
1879   GstRTSPStreamPrivate *priv = stream->priv;
1880   GstCaps *caps = NULL;
1881
1882   g_mutex_lock (&priv->lock);
1883
1884   if (priv->idx == session) {
1885     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
1886     if (caps) {
1887       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
1888       gst_caps_ref (caps);
1889     } else {
1890       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
1891     }
1892   }
1893
1894   g_mutex_unlock (&priv->lock);
1895
1896   return caps;
1897 }
1898
1899 static void
1900 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
1901 {
1902   GstRTSPStreamPrivate *priv = stream->priv;
1903   gchar *name;
1904   GstPadLinkReturn ret;
1905   guint sessid;
1906
1907   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
1908       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
1909
1910   name = gst_pad_get_name (pad);
1911   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
1912     g_free (name);
1913     return;
1914   }
1915   g_free (name);
1916
1917   if (priv->idx != sessid)
1918     return;
1919
1920   if (gst_pad_is_linked (priv->sinkpad)) {
1921     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
1922         GST_DEBUG_PAD_NAME (priv->sinkpad));
1923     return;
1924   }
1925
1926   /* link the RTP pad to the session manager, it should not really fail unless
1927    * this is not really an RTP pad */
1928   ret = gst_pad_link (pad, priv->sinkpad);
1929   if (ret != GST_PAD_LINK_OK)
1930     goto link_failed;
1931   priv->recv_rtp_src = gst_object_ref (pad);
1932
1933   return;
1934
1935 /* ERRORS */
1936 link_failed:
1937   {
1938     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
1939         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
1940   }
1941 }
1942
1943 static void
1944 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
1945     GstRTSPStream * stream)
1946 {
1947   /* TODO: What to do here other than this? */
1948   GST_DEBUG ("Stream %p: Got EOS", stream);
1949   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
1950 }
1951
1952 /**
1953  * gst_rtsp_stream_join_bin:
1954  * @stream: a #GstRTSPStream
1955  * @bin: (transfer none): a #GstBin to join
1956  * @rtpbin: (transfer none): a rtpbin element in @bin
1957  * @state: the target state of the new elements
1958  *
1959  * Join the #GstBin @bin that contains the element @rtpbin.
1960  *
1961  * @stream will link to @rtpbin, which must be inside @bin. The elements
1962  * added to @bin will be set to the state given in @state.
1963  *
1964  * Returns: %TRUE on success.
1965  */
1966 gboolean
1967 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1968     GstElement * rtpbin, GstState state)
1969 {
1970   GstRTSPStreamPrivate *priv;
1971   gint i;
1972   guint idx;
1973   gchar *name;
1974   GstPad *pad, *sinkpad, *selpad;
1975   GstPadLinkReturn ret;
1976
1977   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1978   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1979   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1980
1981   priv = stream->priv;
1982
1983   g_mutex_lock (&priv->lock);
1984   if (priv->is_joined)
1985     goto was_joined;
1986
1987   /* create a session with the same index as the stream */
1988   idx = priv->idx;
1989
1990   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1991
1992   if (!alloc_ports (stream))
1993     goto no_ports;
1994
1995   /* update the dscp qos field in the sinks */
1996   update_dscp_qos (stream);
1997
1998   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1999       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2000     /* For SRTP */
2001     g_signal_connect (rtpbin, "request-rtp-encoder",
2002         (GCallback) request_rtp_encoder, stream);
2003     g_signal_connect (rtpbin, "request-rtcp-encoder",
2004         (GCallback) request_rtcp_encoder, stream);
2005     g_signal_connect (rtpbin, "request-rtp-decoder",
2006         (GCallback) request_rtp_rtcp_decoder, stream);
2007     g_signal_connect (rtpbin, "request-rtcp-decoder",
2008         (GCallback) request_rtp_rtcp_decoder, stream);
2009   }
2010
2011   if (priv->rtx_time > 0 && priv->srcpad) {
2012     /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
2013     g_signal_connect (rtpbin, "request-aux-sender",
2014         (GCallback) request_aux_sender, stream);
2015   }
2016   if (priv->sinkpad) {
2017     g_signal_connect (rtpbin, "request-pt-map",
2018         (GCallback) request_pt_map, stream);
2019   }
2020
2021   /* get a pad for sending RTP */
2022   name = g_strdup_printf ("send_rtp_sink_%u", idx);
2023   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2024   g_free (name);
2025
2026   if (priv->srcpad) {
2027     /* link the RTP pad to the session manager, it should not really fail unless
2028      * this is not really an RTP pad */
2029     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2030     if (ret != GST_PAD_LINK_OK)
2031       goto link_failed;
2032   } else {
2033     /* Need to connect our sinkpad from here */
2034     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2035     /* EOS */
2036     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2037   }
2038
2039   /* get pads from the RTP session element for sending and receiving
2040    * RTP/RTCP*/
2041   name = g_strdup_printf ("send_rtp_src_%u", idx);
2042   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2043   g_free (name);
2044   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2045   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2046   g_free (name);
2047
2048   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2049   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2050   g_free (name);
2051   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2052   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2053   g_free (name);
2054
2055   /* get the session */
2056   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2057
2058   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2059       stream);
2060   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2061       stream);
2062   g_signal_connect (priv->session, "on-ssrc-active",
2063       (GCallback) on_ssrc_active, stream);
2064   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2065       stream);
2066   g_signal_connect (priv->session, "on-bye-timeout",
2067       (GCallback) on_bye_timeout, stream);
2068   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2069       stream);
2070
2071   for (i = 0; i < 2; i++) {
2072     GstPad *teepad, *queuepad;
2073     /* For the sender we create this bit of pipeline for both
2074      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2075      * we need to add a queue before appsink to make the pipeline
2076      * not block. For the TCP case, we want to pump data to the
2077      * client as fast as possible anyway.
2078      *
2079      * .--------.      .-----.    .---------.
2080      * | rtpbin |      | tee |    | udpsink |
2081      * |       send->sink   src->sink       |
2082      * '--------'      |     |    '---------'
2083      *                 |     |    .---------.    .---------.
2084      *                 |     |    |  queue  |    | appsink |
2085      *                 |    src->sink      src->sink       |
2086      *                 '-----'    '---------'    '---------'
2087      *
2088      * When only UDP is allowed, we skip the tee, queue and appsink and link the
2089      * udpsink directly to the session.
2090      */
2091     /* add udpsink */
2092     gst_bin_add (bin, priv->udpsink[i]);
2093     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2094
2095     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2096       /* make tee for RTP/RTCP */
2097       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2098       gst_bin_add (bin, priv->tee[i]);
2099
2100       /* and link to rtpbin send pad */
2101       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2102       gst_pad_link (priv->send_src[i], pad);
2103       gst_object_unref (pad);
2104
2105       /* link tee to udpsink */
2106       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2107       gst_pad_link (teepad, sinkpad);
2108       gst_object_unref (teepad);
2109
2110       /* make queue */
2111       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2112       gst_bin_add (bin, priv->appqueue[i]);
2113       /* and link to tee */
2114       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2115       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2116       gst_pad_link (teepad, pad);
2117       gst_object_unref (pad);
2118       gst_object_unref (teepad);
2119
2120       /* make appsink */
2121       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2122       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2123       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2124       gst_bin_add (bin, priv->appsink[i]);
2125       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2126           &sink_cb, stream, NULL);
2127       /* and link to queue */
2128       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2129       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2130       gst_pad_link (queuepad, pad);
2131       gst_object_unref (pad);
2132       gst_object_unref (queuepad);
2133     } else {
2134       /* else only udpsink needed, link it to the session */
2135       gst_pad_link (priv->send_src[i], sinkpad);
2136     }
2137     gst_object_unref (sinkpad);
2138
2139     /* For the receiver we create this bit of pipeline for both
2140      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2141      * and it is all funneled into the rtpbin receive pad.
2142      *
2143      * .--------.     .--------.    .--------.
2144      * | udpsrc |     | funnel |    | rtpbin |
2145      * |       src->sink      src->sink      |
2146      * '--------'     |        |    '--------'
2147      * .--------.     |        |
2148      * | appsrc |     |        |
2149      * |       src->sink       |
2150      * '--------'     '--------'
2151      */
2152     /* make funnel for the RTP/RTCP receivers */
2153     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2154     gst_bin_add (bin, priv->funnel[i]);
2155
2156     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2157     gst_pad_link (pad, priv->recv_sink[i]);
2158     gst_object_unref (pad);
2159
2160     if (priv->udpsrc_v4[i]) {
2161       if (priv->srcpad) {
2162         /* we set and keep these to playing so that they don't cause NO_PREROLL return
2163          * values. This is only relevant for PLAY pipelines */
2164         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2165         gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2166       }
2167       /* add udpsrc */
2168       gst_bin_add (bin, priv->udpsrc_v4[i]);
2169
2170       /* and link to the funnel v4 */
2171       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2172       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2173       gst_pad_link (pad, selpad);
2174       gst_object_unref (pad);
2175       gst_object_unref (selpad);
2176     }
2177
2178     if (priv->udpsrc_v6[i]) {
2179       if (priv->srcpad) {
2180         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2181         gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2182       }
2183       gst_bin_add (bin, priv->udpsrc_v6[i]);
2184
2185       /* and link to the funnel v6 */
2186       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2187       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2188       gst_pad_link (pad, selpad);
2189       gst_object_unref (pad);
2190       gst_object_unref (selpad);
2191     }
2192
2193     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2194       /* make and add appsrc */
2195       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2196       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2197       gst_bin_add (bin, priv->appsrc[i]);
2198       /* and link to the funnel */
2199       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2200       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2201       gst_pad_link (pad, selpad);
2202       gst_object_unref (pad);
2203       gst_object_unref (selpad);
2204     }
2205
2206     /* check if we need to set to a special state */
2207     if (state != GST_STATE_NULL) {
2208       if (priv->udpsink[i])
2209         gst_element_set_state (priv->udpsink[i], state);
2210       if (priv->appsink[i])
2211         gst_element_set_state (priv->appsink[i], state);
2212       if (priv->appqueue[i])
2213         gst_element_set_state (priv->appqueue[i], state);
2214       if (priv->tee[i])
2215         gst_element_set_state (priv->tee[i], state);
2216       if (priv->funnel[i])
2217         gst_element_set_state (priv->funnel[i], state);
2218       if (priv->appsrc[i])
2219         gst_element_set_state (priv->appsrc[i], state);
2220     }
2221   }
2222
2223   /* be notified of caps changes */
2224   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2225       (GCallback) caps_notify, stream);
2226
2227   priv->is_joined = TRUE;
2228   g_mutex_unlock (&priv->lock);
2229
2230   return TRUE;
2231
2232   /* ERRORS */
2233 was_joined:
2234   {
2235     g_mutex_unlock (&priv->lock);
2236     return TRUE;
2237   }
2238 no_ports:
2239   {
2240     g_mutex_unlock (&priv->lock);
2241     GST_WARNING ("failed to allocate ports %u", idx);
2242     return FALSE;
2243   }
2244 link_failed:
2245   {
2246     GST_WARNING ("failed to link stream %u", idx);
2247     gst_object_unref (priv->send_rtp_sink);
2248     priv->send_rtp_sink = NULL;
2249     g_mutex_unlock (&priv->lock);
2250     return FALSE;
2251   }
2252 }
2253
2254 /**
2255  * gst_rtsp_stream_leave_bin:
2256  * @stream: a #GstRTSPStream
2257  * @bin: (transfer none): a #GstBin
2258  * @rtpbin: (transfer none): a rtpbin #GstElement
2259  *
2260  * Remove the elements of @stream from @bin.
2261  *
2262  * Return: %TRUE on success.
2263  */
2264 gboolean
2265 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2266     GstElement * rtpbin)
2267 {
2268   GstRTSPStreamPrivate *priv;
2269   gint i;
2270   GList *l;
2271
2272   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2273   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2274   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2275
2276   priv = stream->priv;
2277
2278   g_mutex_lock (&priv->lock);
2279   if (!priv->is_joined)
2280     goto was_not_joined;
2281
2282   /* all transports must be removed by now */
2283   if (priv->transports != NULL)
2284     goto transports_not_removed;
2285
2286   clear_tr_cache (priv, TRUE);
2287   clear_tr_cache (priv, FALSE);
2288
2289   GST_INFO ("stream %p leaving bin", stream);
2290
2291   if (priv->srcpad) {
2292     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2293   } else if (priv->recv_rtp_src) {
2294     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2295     gst_object_unref (priv->recv_rtp_src);
2296     priv->recv_rtp_src = NULL;
2297   }
2298   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2299   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2300   gst_object_unref (priv->send_rtp_sink);
2301   priv->send_rtp_sink = NULL;
2302
2303   for (i = 0; i < 2; i++) {
2304     if (priv->udpsink[i])
2305       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2306     if (priv->appsink[i])
2307       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2308     if (priv->appqueue[i])
2309       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2310     if (priv->tee[i])
2311       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2312     if (priv->funnel[i])
2313       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2314     if (priv->appsrc[i])
2315       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2316     if (priv->udpsrc_v4[i]) {
2317       /* and set udpsrc to NULL now before removing */
2318       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2319       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2320       /* removing them should also nicely release the request
2321        * pads when they finalize */
2322       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2323     }
2324     if (priv->udpsrc_v6[i]) {
2325       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2326       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2327       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2328     }
2329
2330     for (l = priv->transport_sources; l; l = l->next) {
2331       GstRTSPMulticastTransportSource *s = l->data;
2332
2333       if (!s->udpsrc[i])
2334         continue;
2335
2336       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2337       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2338       gst_bin_remove (bin, s->udpsrc[i]);
2339     }
2340
2341     if (priv->udpsink[i])
2342       gst_bin_remove (bin, priv->udpsink[i]);
2343     if (priv->appsrc[i])
2344       gst_bin_remove (bin, priv->appsrc[i]);
2345     if (priv->appsink[i])
2346       gst_bin_remove (bin, priv->appsink[i]);
2347     if (priv->appqueue[i])
2348       gst_bin_remove (bin, priv->appqueue[i]);
2349     if (priv->tee[i])
2350       gst_bin_remove (bin, priv->tee[i]);
2351     if (priv->funnel[i])
2352       gst_bin_remove (bin, priv->funnel[i]);
2353
2354     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2355     gst_object_unref (priv->recv_sink[i]);
2356     priv->recv_sink[i] = NULL;
2357
2358     priv->udpsrc_v4[i] = NULL;
2359     priv->udpsrc_v6[i] = NULL;
2360     priv->udpsink[i] = NULL;
2361     priv->appsrc[i] = NULL;
2362     priv->appsink[i] = NULL;
2363     priv->appqueue[i] = NULL;
2364     priv->tee[i] = NULL;
2365     priv->funnel[i] = NULL;
2366   }
2367
2368   for (l = priv->transport_sources; l; l = l->next) {
2369     GstRTSPMulticastTransportSource *s = l->data;
2370     g_slice_free (GstRTSPMulticastTransportSource, s);
2371   }
2372   g_list_free (priv->transport_sources);
2373   priv->transport_sources = NULL;
2374
2375   gst_object_unref (priv->send_src[0]);
2376   priv->send_src[0] = NULL;
2377
2378   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2379   gst_object_unref (priv->send_src[1]);
2380   priv->send_src[1] = NULL;
2381
2382   g_object_unref (priv->session);
2383   priv->session = NULL;
2384   if (priv->caps)
2385     gst_caps_unref (priv->caps);
2386   priv->caps = NULL;
2387
2388   if (priv->srtpenc)
2389     gst_object_unref (priv->srtpenc);
2390   if (priv->srtpdec)
2391     gst_object_unref (priv->srtpdec);
2392
2393   priv->is_joined = FALSE;
2394   g_mutex_unlock (&priv->lock);
2395
2396   return TRUE;
2397
2398 was_not_joined:
2399   {
2400     g_mutex_unlock (&priv->lock);
2401     return TRUE;
2402   }
2403 transports_not_removed:
2404   {
2405     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2406     g_mutex_unlock (&priv->lock);
2407     return FALSE;
2408   }
2409 }
2410
2411 /**
2412  * gst_rtsp_stream_get_rtpinfo:
2413  * @stream: a #GstRTSPStream
2414  * @rtptime: (allow-none): result RTP timestamp
2415  * @seq: (allow-none): result RTP seqnum
2416  * @clock_rate: (allow-none): the clock rate
2417  * @running_time: (allow-none): result running-time
2418  *
2419  * Retrieve the current rtptime, seq and running-time. This is used to
2420  * construct a RTPInfo reply header.
2421  *
2422  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2423  */
2424 gboolean
2425 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2426     guint * rtptime, guint * seq, guint * clock_rate,
2427     GstClockTime * running_time)
2428 {
2429   GstRTSPStreamPrivate *priv;
2430   GstStructure *stats;
2431   GObjectClass *payobjclass;
2432
2433   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2434
2435   priv = stream->priv;
2436
2437   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2438
2439   g_mutex_lock (&priv->lock);
2440
2441   if (g_object_class_find_property (payobjclass, "stats")) {
2442     g_object_get (priv->payloader, "stats", &stats, NULL);
2443     if (stats == NULL)
2444       goto no_stats;
2445
2446     if (seq)
2447       gst_structure_get_uint (stats, "seqnum", seq);
2448
2449     if (rtptime)
2450       gst_structure_get_uint (stats, "timestamp", rtptime);
2451
2452     if (running_time)
2453       gst_structure_get_clock_time (stats, "running-time", running_time);
2454
2455     if (clock_rate) {
2456       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2457       if (*clock_rate == 0 && running_time)
2458         *running_time = GST_CLOCK_TIME_NONE;
2459     }
2460     gst_structure_free (stats);
2461   } else {
2462     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2463         !g_object_class_find_property (payobjclass, "timestamp"))
2464       goto no_stats;
2465
2466     if (seq)
2467       g_object_get (priv->payloader, "seqnum", seq, NULL);
2468
2469     if (rtptime)
2470       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2471
2472     if (running_time)
2473       *running_time = GST_CLOCK_TIME_NONE;
2474   }
2475   g_mutex_unlock (&priv->lock);
2476
2477   return TRUE;
2478
2479   /* ERRORS */
2480 no_stats:
2481   {
2482     GST_WARNING ("Could not get payloader stats");
2483     g_mutex_unlock (&priv->lock);
2484     return FALSE;
2485   }
2486 }
2487
2488 /**
2489  * gst_rtsp_stream_get_caps:
2490  * @stream: a #GstRTSPStream
2491  *
2492  * Retrieve the current caps of @stream.
2493  *
2494  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2495  * after usage.
2496  */
2497 GstCaps *
2498 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2499 {
2500   GstRTSPStreamPrivate *priv;
2501   GstCaps *result;
2502
2503   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2504
2505   priv = stream->priv;
2506
2507   g_mutex_lock (&priv->lock);
2508   if ((result = priv->caps))
2509     gst_caps_ref (result);
2510   g_mutex_unlock (&priv->lock);
2511
2512   return result;
2513 }
2514
2515 /**
2516  * gst_rtsp_stream_recv_rtp:
2517  * @stream: a #GstRTSPStream
2518  * @buffer: (transfer full): a #GstBuffer
2519  *
2520  * Handle an RTP buffer for the stream. This method is usually called when a
2521  * message has been received from a client using the TCP transport.
2522  *
2523  * This function takes ownership of @buffer.
2524  *
2525  * Returns: a GstFlowReturn.
2526  */
2527 GstFlowReturn
2528 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2529 {
2530   GstRTSPStreamPrivate *priv;
2531   GstFlowReturn ret;
2532   GstElement *element;
2533
2534   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2535   priv = stream->priv;
2536   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2537   g_return_val_if_fail (priv->is_joined, FALSE);
2538
2539   g_mutex_lock (&priv->lock);
2540   if (priv->appsrc[0])
2541     element = gst_object_ref (priv->appsrc[0]);
2542   else
2543     element = NULL;
2544   g_mutex_unlock (&priv->lock);
2545
2546   if (element) {
2547     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2548     gst_object_unref (element);
2549   } else {
2550     ret = GST_FLOW_OK;
2551   }
2552   return ret;
2553 }
2554
2555 /**
2556  * gst_rtsp_stream_recv_rtcp:
2557  * @stream: a #GstRTSPStream
2558  * @buffer: (transfer full): a #GstBuffer
2559  *
2560  * Handle an RTCP buffer for the stream. This method is usually called when a
2561  * message has been received from a client using the TCP transport.
2562  *
2563  * This function takes ownership of @buffer.
2564  *
2565  * Returns: a GstFlowReturn.
2566  */
2567 GstFlowReturn
2568 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2569 {
2570   GstRTSPStreamPrivate *priv;
2571   GstFlowReturn ret;
2572   GstElement *element;
2573
2574   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2575   priv = stream->priv;
2576   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2577
2578   if (!priv->is_joined) {
2579     gst_buffer_unref (buffer);
2580     return GST_FLOW_NOT_LINKED;
2581   }
2582   g_mutex_lock (&priv->lock);
2583   if (priv->appsrc[1])
2584     element = gst_object_ref (priv->appsrc[1]);
2585   else
2586     element = NULL;
2587   g_mutex_unlock (&priv->lock);
2588
2589   if (element) {
2590     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2591     gst_object_unref (element);
2592   } else {
2593     ret = GST_FLOW_OK;
2594     gst_buffer_unref (buffer);
2595   }
2596   return ret;
2597 }
2598
2599 /* must be called with lock */
2600 static gboolean
2601 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2602     gboolean add)
2603 {
2604   GstRTSPStreamPrivate *priv = stream->priv;
2605   const GstRTSPTransport *tr;
2606
2607   tr = gst_rtsp_stream_transport_get_transport (trans);
2608
2609   switch (tr->lower_transport) {
2610     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2611     {
2612       GstRTSPMulticastTransportSource *source;
2613       GstBin *bin;
2614
2615       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2616
2617       if (add) {
2618         gchar *host;
2619         gint i;
2620         GstPad *selpad, *pad;
2621
2622         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2623         source->transport = trans;
2624
2625         for (i = 0; i < 2; i++) {
2626           host =
2627               g_strdup_printf ("udp://%s:%d", tr->destination,
2628               (i == 0) ? tr->port.min : tr->port.max);
2629           source->udpsrc[i] =
2630               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2631           g_free (host);
2632
2633           if (priv->srcpad) {
2634             /* we set and keep these to playing so that they don't cause NO_PREROLL return
2635              * values. This is only relevant for PLAY pipelines */
2636             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2637             gst_element_set_locked_state (source->udpsrc[i], TRUE);
2638           }
2639           /* add udpsrc */
2640           gst_bin_add (bin, source->udpsrc[i]);
2641
2642           /* and link to the funnel v4 */
2643           source->selpad[i] = selpad =
2644               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2645           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2646           gst_pad_link (pad, selpad);
2647           gst_object_unref (pad);
2648           gst_object_unref (selpad);
2649         }
2650         gst_object_unref (bin);
2651
2652         priv->transport_sources =
2653             g_list_prepend (priv->transport_sources, source);
2654       } else {
2655         GList *l;
2656
2657         for (l = priv->transport_sources; l; l = l->next) {
2658           source = l->data;
2659
2660           if (source->transport == trans) {
2661             priv->transport_sources =
2662                 g_list_delete_link (priv->transport_sources, l);
2663             break;
2664           }
2665         }
2666
2667         if (l != NULL) {
2668           gint i;
2669
2670           for (i = 0; i < 2; i++) {
2671             /* Will automatically unlink everything */
2672             gst_bin_remove (bin,
2673                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2674
2675             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2676             gst_object_unref (source->udpsrc[i]);
2677
2678             gst_element_release_request_pad (priv->funnel[i],
2679                 source->selpad[i]);
2680           }
2681
2682           g_slice_free (GstRTSPMulticastTransportSource, source);
2683         }
2684       }
2685
2686       /* fall through for the generic case */
2687     }
2688     case GST_RTSP_LOWER_TRANS_UDP:
2689     {
2690       gchar *dest;
2691       gint min, max;
2692       guint ttl = 0;
2693
2694       dest = tr->destination;
2695       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2696         min = tr->port.min;
2697         max = tr->port.max;
2698         ttl = tr->ttl;
2699       } else {
2700         min = tr->client_port.min;
2701         max = tr->client_port.max;
2702       }
2703
2704       if (add) {
2705         if (ttl > 0) {
2706           GST_INFO ("setting ttl-mc %d", ttl);
2707           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2708           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2709         }
2710         GST_INFO ("adding %s:%d-%d", dest, min, max);
2711         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2712         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2713         priv->transports = g_list_prepend (priv->transports, trans);
2714       } else {
2715         GST_INFO ("removing %s:%d-%d", dest, min, max);
2716         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2717         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2718         priv->transports = g_list_remove (priv->transports, trans);
2719       }
2720       priv->transports_cookie++;
2721       break;
2722     }
2723     case GST_RTSP_LOWER_TRANS_TCP:
2724       if (add) {
2725         GST_INFO ("adding TCP %s", tr->destination);
2726         priv->transports = g_list_prepend (priv->transports, trans);
2727       } else {
2728         GST_INFO ("removing TCP %s", tr->destination);
2729         priv->transports = g_list_remove (priv->transports, trans);
2730       }
2731       priv->transports_cookie++;
2732       break;
2733     default:
2734       goto unknown_transport;
2735   }
2736   return TRUE;
2737
2738   /* ERRORS */
2739 unknown_transport:
2740   {
2741     GST_INFO ("Unknown transport %d", tr->lower_transport);
2742     return FALSE;
2743   }
2744 }
2745
2746
2747 /**
2748  * gst_rtsp_stream_add_transport:
2749  * @stream: a #GstRTSPStream
2750  * @trans: (transfer none): a #GstRTSPStreamTransport
2751  *
2752  * Add the transport in @trans to @stream. The media of @stream will
2753  * then also be send to the values configured in @trans.
2754  *
2755  * @stream must be joined to a bin.
2756  *
2757  * @trans must contain a valid #GstRTSPTransport.
2758  *
2759  * Returns: %TRUE if @trans was added
2760  */
2761 gboolean
2762 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2763     GstRTSPStreamTransport * trans)
2764 {
2765   GstRTSPStreamPrivate *priv;
2766   gboolean res;
2767
2768   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2769   priv = stream->priv;
2770   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2771   g_return_val_if_fail (priv->is_joined, FALSE);
2772
2773   g_mutex_lock (&priv->lock);
2774   res = update_transport (stream, trans, TRUE);
2775   g_mutex_unlock (&priv->lock);
2776
2777   return res;
2778 }
2779
2780 /**
2781  * gst_rtsp_stream_remove_transport:
2782  * @stream: a #GstRTSPStream
2783  * @trans: (transfer none): a #GstRTSPStreamTransport
2784  *
2785  * Remove the transport in @trans from @stream. The media of @stream will
2786  * not be sent to the values configured in @trans.
2787  *
2788  * @stream must be joined to a bin.
2789  *
2790  * @trans must contain a valid #GstRTSPTransport.
2791  *
2792  * Returns: %TRUE if @trans was removed
2793  */
2794 gboolean
2795 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2796     GstRTSPStreamTransport * trans)
2797 {
2798   GstRTSPStreamPrivate *priv;
2799   gboolean res;
2800
2801   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2802   priv = stream->priv;
2803   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2804   g_return_val_if_fail (priv->is_joined, FALSE);
2805
2806   g_mutex_lock (&priv->lock);
2807   res = update_transport (stream, trans, FALSE);
2808   g_mutex_unlock (&priv->lock);
2809
2810   return res;
2811 }
2812
2813 /**
2814  * gst_rtsp_stream_update_crypto:
2815  * @stream: a #GstRTSPStream
2816  * @ssrc: the SSRC
2817  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2818  *
2819  * Update the new crypto information for @ssrc in @stream. If information
2820  * for @ssrc did not exist, it will be added. If information
2821  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2822  * be removed from @stream.
2823  *
2824  * Returns: %TRUE if @crypto could be updated
2825  */
2826 gboolean
2827 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2828     guint ssrc, GstCaps * crypto)
2829 {
2830   GstRTSPStreamPrivate *priv;
2831
2832   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2833   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2834
2835   priv = stream->priv;
2836
2837   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2838
2839   g_mutex_lock (&priv->lock);
2840   if (crypto)
2841     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2842         gst_caps_ref (crypto));
2843   else
2844     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2845   g_mutex_unlock (&priv->lock);
2846
2847   return TRUE;
2848 }
2849
2850 /**
2851  * gst_rtsp_stream_get_rtp_socket:
2852  * @stream: a #GstRTSPStream
2853  * @family: the socket family
2854  *
2855  * Get the RTP socket from @stream for a @family.
2856  *
2857  * @stream must be joined to a bin.
2858  *
2859  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2860  * socket could be allocated for @family. Unref after usage
2861  */
2862 GSocket *
2863 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2864 {
2865   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2866   GSocket *socket;
2867   const gchar *name;
2868
2869   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2870   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2871       family == G_SOCKET_FAMILY_IPV6, NULL);
2872   g_return_val_if_fail (priv->udpsink[0], NULL);
2873
2874   if (family == G_SOCKET_FAMILY_IPV6)
2875     name = "socket-v6";
2876   else
2877     name = "socket";
2878
2879   g_object_get (priv->udpsink[0], name, &socket, NULL);
2880
2881   return socket;
2882 }
2883
2884 /**
2885  * gst_rtsp_stream_get_rtcp_socket:
2886  * @stream: a #GstRTSPStream
2887  * @family: the socket family
2888  *
2889  * Get the RTCP socket from @stream for a @family.
2890  *
2891  * @stream must be joined to a bin.
2892  *
2893  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2894  * socket could be allocated for @family. Unref after usage
2895  */
2896 GSocket *
2897 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2898 {
2899   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2900   GSocket *socket;
2901   const gchar *name;
2902
2903   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2904   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2905       family == G_SOCKET_FAMILY_IPV6, NULL);
2906   g_return_val_if_fail (priv->udpsink[1], NULL);
2907
2908   if (family == G_SOCKET_FAMILY_IPV6)
2909     name = "socket-v6";
2910   else
2911     name = "socket";
2912
2913   g_object_get (priv->udpsink[1], name, &socket, NULL);
2914
2915   return socket;
2916 }
2917
2918 /**
2919  * gst_rtsp_stream_set_seqnum:
2920  * @stream: a #GstRTSPStream
2921  * @seqnum: a new sequence number
2922  *
2923  * Configure the sequence number in the payloader of @stream to @seqnum.
2924  */
2925 void
2926 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
2927 {
2928   GstRTSPStreamPrivate *priv;
2929
2930   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2931
2932   priv = stream->priv;
2933
2934   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
2935 }
2936
2937 /**
2938  * gst_rtsp_stream_get_seqnum:
2939  * @stream: a #GstRTSPStream
2940  *
2941  * Get the configured sequence number in the payloader of @stream.
2942  *
2943  * Returns: the sequence number of the payloader.
2944  */
2945 guint16
2946 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
2947 {
2948   GstRTSPStreamPrivate *priv;
2949   guint seqnum;
2950
2951   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2952
2953   priv = stream->priv;
2954
2955   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
2956
2957   return seqnum;
2958 }
2959
2960 /**
2961  * gst_rtsp_stream_transport_filter:
2962  * @stream: a #GstRTSPStream
2963  * @func: (scope call) (allow-none): a callback
2964  * @user_data: (closure): user data passed to @func
2965  *
2966  * Call @func for each transport managed by @stream. The result value of @func
2967  * determines what happens to the transport. @func will be called with @stream
2968  * locked so no further actions on @stream can be performed from @func.
2969  *
2970  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2971  * @stream.
2972  *
2973  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2974  *
2975  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2976  * will also be added with an additional ref to the result #GList of this
2977  * function..
2978  *
2979  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2980  *
2981  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2982  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2983  * element in the #GList should be unreffed before the list is freed.
2984  */
2985 GList *
2986 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2987     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2988 {
2989   GstRTSPStreamPrivate *priv;
2990   GList *result, *walk, *next;
2991   GHashTable *visited = NULL;
2992   guint cookie;
2993
2994   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2995
2996   priv = stream->priv;
2997
2998   result = NULL;
2999   if (func)
3000     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3001
3002   g_mutex_lock (&priv->lock);
3003 restart:
3004   cookie = priv->transports_cookie;
3005   for (walk = priv->transports; walk; walk = next) {
3006     GstRTSPStreamTransport *trans = walk->data;
3007     GstRTSPFilterResult res;
3008     gboolean changed;
3009
3010     next = g_list_next (walk);
3011
3012     if (func) {
3013       /* only visit each transport once */
3014       if (g_hash_table_contains (visited, trans))
3015         continue;
3016
3017       g_hash_table_add (visited, g_object_ref (trans));
3018       g_mutex_unlock (&priv->lock);
3019
3020       res = func (stream, trans, user_data);
3021
3022       g_mutex_lock (&priv->lock);
3023     } else
3024       res = GST_RTSP_FILTER_REF;
3025
3026     changed = (cookie != priv->transports_cookie);
3027
3028     switch (res) {
3029       case GST_RTSP_FILTER_REMOVE:
3030         update_transport (stream, trans, FALSE);
3031         break;
3032       case GST_RTSP_FILTER_REF:
3033         result = g_list_prepend (result, g_object_ref (trans));
3034         break;
3035       case GST_RTSP_FILTER_KEEP:
3036       default:
3037         break;
3038     }
3039     if (changed)
3040       goto restart;
3041   }
3042   g_mutex_unlock (&priv->lock);
3043
3044   if (func)
3045     g_hash_table_unref (visited);
3046
3047   return result;
3048 }
3049
3050 static GstPadProbeReturn
3051 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3052 {
3053   GstRTSPStreamPrivate *priv;
3054   GstRTSPStream *stream;
3055
3056   stream = user_data;
3057   priv = stream->priv;
3058
3059   GST_DEBUG_OBJECT (pad, "now blocking");
3060
3061   g_mutex_lock (&priv->lock);
3062   priv->blocking = TRUE;
3063   g_mutex_unlock (&priv->lock);
3064
3065   gst_element_post_message (priv->payloader,
3066       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3067           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3068
3069   return GST_PAD_PROBE_OK;
3070 }
3071
3072 /**
3073  * gst_rtsp_stream_set_blocked:
3074  * @stream: a #GstRTSPStream
3075  * @blocked: boolean indicating we should block or unblock
3076  *
3077  * Blocks or unblocks the dataflow on @stream.
3078  *
3079  * Returns: %TRUE on success
3080  */
3081 gboolean
3082 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3083 {
3084   GstRTSPStreamPrivate *priv;
3085
3086   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3087
3088   priv = stream->priv;
3089
3090   g_mutex_lock (&priv->lock);
3091   if (blocked) {
3092     priv->blocking = FALSE;
3093     if (priv->blocked_id == 0) {
3094       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3095           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3096           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3097           g_object_ref (stream), g_object_unref);
3098     }
3099   } else {
3100     if (priv->blocked_id != 0) {
3101       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3102       priv->blocked_id = 0;
3103       priv->blocking = FALSE;
3104     }
3105   }
3106   g_mutex_unlock (&priv->lock);
3107
3108   return TRUE;
3109 }
3110
3111 /**
3112  * gst_rtsp_stream_is_blocking:
3113  * @stream: a #GstRTSPStream
3114  *
3115  * Check if @stream is blocking on a #GstBuffer.
3116  *
3117  * Returns: %TRUE if @stream is blocking
3118  */
3119 gboolean
3120 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3121 {
3122   GstRTSPStreamPrivate *priv;
3123   gboolean result;
3124
3125   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3126
3127   priv = stream->priv;
3128
3129   g_mutex_lock (&priv->lock);
3130   result = priv->blocking;
3131   g_mutex_unlock (&priv->lock);
3132
3133   return result;
3134 }
3135
3136 /**
3137  * gst_rtsp_stream_query_position:
3138  * @stream: a #GstRTSPStream
3139  *
3140  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3141  * the RTP parts of the pipeline and not the RTCP parts.
3142  *
3143  * Returns: %TRUE if the position could be queried
3144  */
3145 gboolean
3146 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3147 {
3148   GstRTSPStreamPrivate *priv;
3149   GstElement *sink;
3150   gboolean ret;
3151
3152   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3153
3154   priv = stream->priv;
3155
3156   g_mutex_lock (&priv->lock);
3157   if ((sink = priv->udpsink[0]))
3158     gst_object_ref (sink);
3159   g_mutex_unlock (&priv->lock);
3160
3161   if (!sink)
3162     return FALSE;
3163
3164   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3165   gst_object_unref (sink);
3166
3167   return ret;
3168 }
3169
3170 /**
3171  * gst_rtsp_stream_query_stop:
3172  * @stream: a #GstRTSPStream
3173  *
3174  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3175  * the RTP parts of the pipeline and not the RTCP parts.
3176  *
3177  * Returns: %TRUE if the stop could be queried
3178  */
3179 gboolean
3180 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3181 {
3182   GstRTSPStreamPrivate *priv;
3183   GstElement *sink;
3184   GstQuery *query;
3185   gboolean ret;
3186
3187   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3188
3189   priv = stream->priv;
3190
3191   g_mutex_lock (&priv->lock);
3192   if ((sink = priv->udpsink[0]))
3193     gst_object_ref (sink);
3194   g_mutex_unlock (&priv->lock);
3195
3196   if (!sink)
3197     return FALSE;
3198
3199   query = gst_query_new_segment (GST_FORMAT_TIME);
3200   if ((ret = gst_element_query (sink, query))) {
3201     GstFormat format;
3202
3203     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3204     if (format != GST_FORMAT_TIME)
3205       *stop = -1;
3206   }
3207   gst_query_unref (query);
3208   gst_object_unref (sink);
3209
3210   return ret;
3211
3212 }