rtsp-stream: Fix some minor memory leaks
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 typedef struct
62 {
63   GstRTSPStreamTransport *transport;
64
65   /* RTP and RTCP source */
66   GstElement *udpsrc[2];
67   GstPad *selpad[2];
68 } GstRTSPMulticastTransportSource;
69
70 struct _GstRTSPStreamPrivate
71 {
72   GMutex lock;
73   guint idx;
74   GstPad *srcpad;
75   GstElement *payloader;
76   guint buffer_size;
77   gboolean is_joined;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_sink[2];
86   GstPad *send_src[2];
87
88   /* the RTPSession object */
89   GObject *session;
90
91   /* SRTP encoder/decoder */
92   GstElement *srtpenc;
93   GstElement *srtpdec;
94   GHashTable *keys;
95
96   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
97    * sockets */
98   GstElement *udpsrc_v4[2];
99
100   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
101    * sockets */
102   GstElement *udpsrc_v6[2];
103
104   GstElement *udpsink[2];
105
106   /* for TCP transport */
107   GstElement *appsrc[2];
108   GstElement *appqueue[2];
109   GstElement *appsink[2];
110
111   GstElement *tee[2];
112   GstElement *funnel[2];
113
114   /* retransmission */
115   GstElement *rtxsend;
116   guint rtx_pt;
117   GstClockTime rtx_time;
118
119   /* server ports for sending/receiving over ipv4 */
120   GstRTSPRange server_port_v4;
121   GstRTSPAddress *server_addr_v4;
122   gboolean have_ipv4;
123
124   /* server ports for sending/receiving over ipv6 */
125   GstRTSPRange server_port_v6;
126   GstRTSPAddress *server_addr_v6;
127   gboolean have_ipv6;
128
129   /* multicast addresses */
130   GstRTSPAddressPool *pool;
131   GstRTSPAddress *addr_v4;
132   GstRTSPAddress *addr_v6;
133
134   /* the caps of the stream */
135   gulong caps_sig;
136   GstCaps *caps;
137
138   /* transports we stream to */
139   guint n_active;
140   GList *transports;
141   guint transports_cookie;
142   GList *tr_cache;
143   guint tr_cache_cookie;
144
145   /* UDP sources for UDP multicast transports */
146   GList *transport_sources;
147
148   gint dscp_qos;
149
150   /* stream blocking */
151   gulong blocked_id;
152   gboolean blocking;
153 };
154
155 #define DEFAULT_CONTROL         NULL
156 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
157 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
158                                         GST_RTSP_LOWER_TRANS_TCP
159
160 enum
161 {
162   PROP_0,
163   PROP_CONTROL,
164   PROP_PROFILES,
165   PROP_PROTOCOLS,
166   PROP_LAST
167 };
168
169 enum
170 {
171   SIGNAL_NEW_RTP_ENCODER,
172   SIGNAL_NEW_RTCP_ENCODER,
173   SIGNAL_LAST
174 };
175
176 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
177 #define GST_CAT_DEFAULT rtsp_stream_debug
178
179 static GQuark ssrc_stream_map_key;
180
181 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
182     GValue * value, GParamSpec * pspec);
183 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
184     const GValue * value, GParamSpec * pspec);
185
186 static void gst_rtsp_stream_finalize (GObject * obj);
187
188 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
189
190 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
191
192 static void
193 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
194 {
195   GObjectClass *gobject_class;
196
197   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
198
199   gobject_class = G_OBJECT_CLASS (klass);
200
201   gobject_class->get_property = gst_rtsp_stream_get_property;
202   gobject_class->set_property = gst_rtsp_stream_set_property;
203   gobject_class->finalize = gst_rtsp_stream_finalize;
204
205   g_object_class_install_property (gobject_class, PROP_CONTROL,
206       g_param_spec_string ("control", "Control",
207           "The control string for this stream", DEFAULT_CONTROL,
208           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209
210   g_object_class_install_property (gobject_class, PROP_PROFILES,
211       g_param_spec_flags ("profiles", "Profiles",
212           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
213           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214
215   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
216       g_param_spec_flags ("protocols", "Protocols",
217           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
218           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
219
220   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
221       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
222       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
223       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
224
225   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
226       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
227       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
228       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
229
230   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
231
232   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
233 }
234
235 static void
236 gst_rtsp_stream_init (GstRTSPStream * stream)
237 {
238   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
239
240   GST_DEBUG ("new stream %p", stream);
241
242   stream->priv = priv;
243
244   priv->dscp_qos = -1;
245   priv->control = g_strdup (DEFAULT_CONTROL);
246   priv->profiles = DEFAULT_PROFILES;
247   priv->protocols = DEFAULT_PROTOCOLS;
248
249   g_mutex_init (&priv->lock);
250
251   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
252       NULL, (GDestroyNotify) gst_caps_unref);
253 }
254
255 static void
256 gst_rtsp_stream_finalize (GObject * obj)
257 {
258   GstRTSPStream *stream;
259   GstRTSPStreamPrivate *priv;
260
261   stream = GST_RTSP_STREAM (obj);
262   priv = stream->priv;
263
264   GST_DEBUG ("finalize stream %p", stream);
265
266   /* we really need to be unjoined now */
267   g_return_if_fail (!priv->is_joined);
268
269   if (priv->addr_v4)
270     gst_rtsp_address_free (priv->addr_v4);
271   if (priv->addr_v6)
272     gst_rtsp_address_free (priv->addr_v6);
273   if (priv->server_addr_v4)
274     gst_rtsp_address_free (priv->server_addr_v4);
275   if (priv->server_addr_v6)
276     gst_rtsp_address_free (priv->server_addr_v6);
277   if (priv->pool)
278     g_object_unref (priv->pool);
279   if (priv->rtxsend)
280     g_object_unref (priv->rtxsend);
281
282   gst_object_unref (priv->payloader);
283   gst_object_unref (priv->srcpad);
284   g_free (priv->control);
285   g_mutex_clear (&priv->lock);
286
287   g_hash_table_unref (priv->keys);
288
289   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
290 }
291
292 static void
293 gst_rtsp_stream_get_property (GObject * object, guint propid,
294     GValue * value, GParamSpec * pspec)
295 {
296   GstRTSPStream *stream = GST_RTSP_STREAM (object);
297
298   switch (propid) {
299     case PROP_CONTROL:
300       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
301       break;
302     case PROP_PROFILES:
303       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
304       break;
305     case PROP_PROTOCOLS:
306       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
307       break;
308     default:
309       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
310   }
311 }
312
313 static void
314 gst_rtsp_stream_set_property (GObject * object, guint propid,
315     const GValue * value, GParamSpec * pspec)
316 {
317   GstRTSPStream *stream = GST_RTSP_STREAM (object);
318
319   switch (propid) {
320     case PROP_CONTROL:
321       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
322       break;
323     case PROP_PROFILES:
324       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
325       break;
326     case PROP_PROTOCOLS:
327       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
328       break;
329     default:
330       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
331   }
332 }
333
334 /**
335  * gst_rtsp_stream_new:
336  * @idx: an index
337  * @srcpad: a #GstPad
338  * @payloader: a #GstElement
339  *
340  * Create a new media stream with index @idx that handles RTP data on
341  * @srcpad and has a payloader element @payloader.
342  *
343  * Returns: (transfer full): a new #GstRTSPStream
344  */
345 GstRTSPStream *
346 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
347 {
348   GstRTSPStreamPrivate *priv;
349   GstRTSPStream *stream;
350
351   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
352   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
353   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
354
355   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
356   priv = stream->priv;
357   priv->idx = idx;
358   priv->payloader = gst_object_ref (payloader);
359   priv->srcpad = gst_object_ref (srcpad);
360
361   return stream;
362 }
363
364 /**
365  * gst_rtsp_stream_get_index:
366  * @stream: a #GstRTSPStream
367  *
368  * Get the stream index.
369  *
370  * Return: the stream index.
371  */
372 guint
373 gst_rtsp_stream_get_index (GstRTSPStream * stream)
374 {
375   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
376
377   return stream->priv->idx;
378 }
379
380 /**
381  * gst_rtsp_stream_get_pt:
382  * @stream: a #GstRTSPStream
383  *
384  * Get the stream payload type.
385  *
386  * Return: the stream payload type.
387  */
388 guint
389 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
390 {
391   GstRTSPStreamPrivate *priv;
392   guint pt;
393
394   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
395
396   priv = stream->priv;
397
398   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
399
400   return pt;
401 }
402
403 /**
404  * gst_rtsp_stream_get_srcpad:
405  * @stream: a #GstRTSPStream
406  *
407  * Get the srcpad associated with @stream.
408  *
409  * Returns: (transfer full): the srcpad. Unref after usage.
410  */
411 GstPad *
412 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
413 {
414   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
415
416   return gst_object_ref (stream->priv->srcpad);
417 }
418
419 /**
420  * gst_rtsp_stream_get_control:
421  * @stream: a #GstRTSPStream
422  *
423  * Get the control string to identify this stream.
424  *
425  * Returns: (transfer full): the control string. g_free() after usage.
426  */
427 gchar *
428 gst_rtsp_stream_get_control (GstRTSPStream * stream)
429 {
430   GstRTSPStreamPrivate *priv;
431   gchar *result;
432
433   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
434
435   priv = stream->priv;
436
437   g_mutex_lock (&priv->lock);
438   if ((result = g_strdup (priv->control)) == NULL)
439     result = g_strdup_printf ("stream=%u", priv->idx);
440   g_mutex_unlock (&priv->lock);
441
442   return result;
443 }
444
445 /**
446  * gst_rtsp_stream_set_control:
447  * @stream: a #GstRTSPStream
448  * @control: a control string
449  *
450  * Set the control string in @stream.
451  */
452 void
453 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
454 {
455   GstRTSPStreamPrivate *priv;
456
457   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
458
459   priv = stream->priv;
460
461   g_mutex_lock (&priv->lock);
462   g_free (priv->control);
463   priv->control = g_strdup (control);
464   g_mutex_unlock (&priv->lock);
465 }
466
467 /**
468  * gst_rtsp_stream_has_control:
469  * @stream: a #GstRTSPStream
470  * @control: a control string
471  *
472  * Check if @stream has the control string @control.
473  *
474  * Returns: %TRUE is @stream has @control as the control string
475  */
476 gboolean
477 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
478 {
479   GstRTSPStreamPrivate *priv;
480   gboolean res;
481
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
483
484   priv = stream->priv;
485
486   g_mutex_lock (&priv->lock);
487   if (priv->control)
488     res = (g_strcmp0 (priv->control, control) == 0);
489   else {
490     guint streamid;
491
492     if (sscanf (control, "stream=%u", &streamid) > 0)
493       res = (streamid == priv->idx);
494     else
495       res = FALSE;
496   }
497   g_mutex_unlock (&priv->lock);
498
499   return res;
500 }
501
502 /**
503  * gst_rtsp_stream_set_mtu:
504  * @stream: a #GstRTSPStream
505  * @mtu: a new MTU
506  *
507  * Configure the mtu in the payloader of @stream to @mtu.
508  */
509 void
510 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
511 {
512   GstRTSPStreamPrivate *priv;
513
514   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
515
516   priv = stream->priv;
517
518   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
519
520   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
521 }
522
523 /**
524  * gst_rtsp_stream_get_mtu:
525  * @stream: a #GstRTSPStream
526  *
527  * Get the configured MTU in the payloader of @stream.
528  *
529  * Returns: the MTU of the payloader.
530  */
531 guint
532 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
533 {
534   GstRTSPStreamPrivate *priv;
535   guint mtu;
536
537   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
538
539   priv = stream->priv;
540
541   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
542
543   return mtu;
544 }
545
546 /* Update the dscp qos property on the udp sinks */
547 static void
548 update_dscp_qos (GstRTSPStream * stream)
549 {
550   GstRTSPStreamPrivate *priv;
551
552   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
553
554   priv = stream->priv;
555
556   if (priv->udpsink[0]) {
557     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
558         NULL);
559   }
560
561   if (priv->udpsink[1]) {
562     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
563         NULL);
564   }
565 }
566
567 /**
568  * gst_rtsp_stream_set_dscp_qos:
569  * @stream: a #GstRTSPStream
570  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
571  *
572  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
573  */
574 void
575 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
576 {
577   GstRTSPStreamPrivate *priv;
578
579   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
580
581   priv = stream->priv;
582
583   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
584
585   if (dscp_qos < -1 || dscp_qos > 63) {
586     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
587     return;
588   }
589
590   priv->dscp_qos = dscp_qos;
591
592   update_dscp_qos (stream);
593 }
594
595 /**
596  * gst_rtsp_stream_get_dscp_qos:
597  * @stream: a #GstRTSPStream
598  *
599  * Get the configured DSCP QoS in of the outgoing sockets.
600  *
601  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
602  */
603 gint
604 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
605 {
606   GstRTSPStreamPrivate *priv;
607
608   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
609
610   priv = stream->priv;
611
612   return priv->dscp_qos;
613 }
614
615 /**
616  * gst_rtsp_stream_is_transport_supported:
617  * @stream: a #GstRTSPStream
618  * @transport: (transfer none): a #GstRTSPTransport
619  *
620  * Check if @transport can be handled by stream
621  *
622  * Returns: %TRUE if @transport can be handled by @stream.
623  */
624 gboolean
625 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
626     GstRTSPTransport * transport)
627 {
628   GstRTSPStreamPrivate *priv;
629
630   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
631
632   priv = stream->priv;
633
634   g_mutex_lock (&priv->lock);
635   if (transport->trans != GST_RTSP_TRANS_RTP)
636     goto unsupported_transmode;
637
638   if (!(transport->profile & priv->profiles))
639     goto unsupported_profile;
640
641   if (!(transport->lower_transport & priv->protocols))
642     goto unsupported_ltrans;
643
644   g_mutex_unlock (&priv->lock);
645
646   return TRUE;
647
648   /* ERRORS */
649 unsupported_transmode:
650   {
651     GST_DEBUG ("unsupported transport mode %d", transport->trans);
652     g_mutex_unlock (&priv->lock);
653     return FALSE;
654   }
655 unsupported_profile:
656   {
657     GST_DEBUG ("unsupported profile %d", transport->profile);
658     g_mutex_unlock (&priv->lock);
659     return FALSE;
660   }
661 unsupported_ltrans:
662   {
663     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
664     g_mutex_unlock (&priv->lock);
665     return FALSE;
666   }
667 }
668
669 /**
670  * gst_rtsp_stream_set_profiles:
671  * @stream: a #GstRTSPStream
672  * @profiles: the new profiles
673  *
674  * Configure the allowed profiles for @stream.
675  */
676 void
677 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
678 {
679   GstRTSPStreamPrivate *priv;
680
681   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
682
683   priv = stream->priv;
684
685   g_mutex_lock (&priv->lock);
686   priv->profiles = profiles;
687   g_mutex_unlock (&priv->lock);
688 }
689
690 /**
691  * gst_rtsp_stream_get_profiles:
692  * @stream: a #GstRTSPStream
693  *
694  * Get the allowed profiles of @stream.
695  *
696  * Returns: a #GstRTSPProfile
697  */
698 GstRTSPProfile
699 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
700 {
701   GstRTSPStreamPrivate *priv;
702   GstRTSPProfile res;
703
704   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
705
706   priv = stream->priv;
707
708   g_mutex_lock (&priv->lock);
709   res = priv->profiles;
710   g_mutex_unlock (&priv->lock);
711
712   return res;
713 }
714
715 /**
716  * gst_rtsp_stream_set_protocols:
717  * @stream: a #GstRTSPStream
718  * @protocols: the new flags
719  *
720  * Configure the allowed lower transport for @stream.
721  */
722 void
723 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
724     GstRTSPLowerTrans protocols)
725 {
726   GstRTSPStreamPrivate *priv;
727
728   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
729
730   priv = stream->priv;
731
732   g_mutex_lock (&priv->lock);
733   priv->protocols = protocols;
734   g_mutex_unlock (&priv->lock);
735 }
736
737 /**
738  * gst_rtsp_stream_get_protocols:
739  * @stream: a #GstRTSPStream
740  *
741  * Get the allowed protocols of @stream.
742  *
743  * Returns: a #GstRTSPLowerTrans
744  */
745 GstRTSPLowerTrans
746 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
747 {
748   GstRTSPStreamPrivate *priv;
749   GstRTSPLowerTrans res;
750
751   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
752       GST_RTSP_LOWER_TRANS_UNKNOWN);
753
754   priv = stream->priv;
755
756   g_mutex_lock (&priv->lock);
757   res = priv->protocols;
758   g_mutex_unlock (&priv->lock);
759
760   return res;
761 }
762
763 /**
764  * gst_rtsp_stream_set_address_pool:
765  * @stream: a #GstRTSPStream
766  * @pool: (transfer none): a #GstRTSPAddressPool
767  *
768  * configure @pool to be used as the address pool of @stream.
769  */
770 void
771 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
772     GstRTSPAddressPool * pool)
773 {
774   GstRTSPStreamPrivate *priv;
775   GstRTSPAddressPool *old;
776
777   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
778
779   priv = stream->priv;
780
781   GST_LOG_OBJECT (stream, "set address pool %p", pool);
782
783   g_mutex_lock (&priv->lock);
784   if ((old = priv->pool) != pool)
785     priv->pool = pool ? g_object_ref (pool) : NULL;
786   else
787     old = NULL;
788   g_mutex_unlock (&priv->lock);
789
790   if (old)
791     g_object_unref (old);
792 }
793
794 /**
795  * gst_rtsp_stream_get_address_pool:
796  * @stream: a #GstRTSPStream
797  *
798  * Get the #GstRTSPAddressPool used as the address pool of @stream.
799  *
800  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
801  * usage.
802  */
803 GstRTSPAddressPool *
804 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
805 {
806   GstRTSPStreamPrivate *priv;
807   GstRTSPAddressPool *result;
808
809   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
810
811   priv = stream->priv;
812
813   g_mutex_lock (&priv->lock);
814   if ((result = priv->pool))
815     g_object_ref (result);
816   g_mutex_unlock (&priv->lock);
817
818   return result;
819 }
820
821 /**
822  * gst_rtsp_stream_get_multicast_address:
823  * @stream: a #GstRTSPStream
824  * @family: the #GSocketFamily
825  *
826  * Get the multicast address of @stream for @family.
827  *
828  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
829  * or %NULL when no address could be allocated. gst_rtsp_address_free()
830  * after usage.
831  */
832 GstRTSPAddress *
833 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
834     GSocketFamily family)
835 {
836   GstRTSPStreamPrivate *priv;
837   GstRTSPAddress *result;
838   GstRTSPAddress **addrp;
839   GstRTSPAddressFlags flags;
840
841   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
842
843   priv = stream->priv;
844
845   if (family == G_SOCKET_FAMILY_IPV6) {
846     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
847     addrp = &priv->addr_v6;
848   } else {
849     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
850     addrp = &priv->addr_v4;
851   }
852
853   g_mutex_lock (&priv->lock);
854   if (*addrp == NULL) {
855     if (priv->pool == NULL)
856       goto no_pool;
857
858     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
859
860     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
861     if (*addrp == NULL)
862       goto no_address;
863   }
864   result = gst_rtsp_address_copy (*addrp);
865   g_mutex_unlock (&priv->lock);
866
867   return result;
868
869   /* ERRORS */
870 no_pool:
871   {
872     GST_ERROR_OBJECT (stream, "no address pool specified");
873     g_mutex_unlock (&priv->lock);
874     return NULL;
875   }
876 no_address:
877   {
878     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
879     g_mutex_unlock (&priv->lock);
880     return NULL;
881   }
882 }
883
884 /**
885  * gst_rtsp_stream_reserve_address:
886  * @stream: a #GstRTSPStream
887  * @address: an address
888  * @port: a port
889  * @n_ports: n_ports
890  * @ttl: a TTL
891  *
892  * Reserve @address and @port as the address and port of @stream.
893  *
894  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
895  * the address could be reserved. gst_rtsp_address_free() after usage.
896  */
897 GstRTSPAddress *
898 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
899     const gchar * address, guint port, guint n_ports, guint ttl)
900 {
901   GstRTSPStreamPrivate *priv;
902   GstRTSPAddress *result;
903   GInetAddress *addr;
904   GSocketFamily family;
905   GstRTSPAddress **addrp;
906
907   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
908   g_return_val_if_fail (address != NULL, NULL);
909   g_return_val_if_fail (port > 0, NULL);
910   g_return_val_if_fail (n_ports > 0, NULL);
911   g_return_val_if_fail (ttl > 0, NULL);
912
913   priv = stream->priv;
914
915   addr = g_inet_address_new_from_string (address);
916   if (!addr) {
917     GST_ERROR ("failed to get inet addr from %s", address);
918     family = G_SOCKET_FAMILY_IPV4;
919   } else {
920     family = g_inet_address_get_family (addr);
921     g_object_unref (addr);
922   }
923
924   if (family == G_SOCKET_FAMILY_IPV6)
925     addrp = &priv->addr_v6;
926   else
927     addrp = &priv->addr_v4;
928
929   g_mutex_lock (&priv->lock);
930   if (*addrp == NULL) {
931     GstRTSPAddressPoolResult res;
932
933     if (priv->pool == NULL)
934       goto no_pool;
935
936     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
937         port, n_ports, ttl, addrp);
938     if (res != GST_RTSP_ADDRESS_POOL_OK)
939       goto no_address;
940   } else {
941     if (strcmp ((*addrp)->address, address) ||
942         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
943         (*addrp)->ttl != ttl)
944       goto different_address;
945   }
946   result = gst_rtsp_address_copy (*addrp);
947   g_mutex_unlock (&priv->lock);
948
949   return result;
950
951   /* ERRORS */
952 no_pool:
953   {
954     GST_ERROR_OBJECT (stream, "no address pool specified");
955     g_mutex_unlock (&priv->lock);
956     return NULL;
957   }
958 no_address:
959   {
960     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
961         address);
962     g_mutex_unlock (&priv->lock);
963     return NULL;
964   }
965 different_address:
966   {
967     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
968         " reserved", address);
969     g_mutex_unlock (&priv->lock);
970     return NULL;
971   }
972 }
973
974 static gboolean
975 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
976     GSocketFamily family, GstElement * udpsrc_out[2],
977     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
978     GstRTSPAddress ** server_addr_out)
979 {
980   GstStateChangeReturn ret;
981   GstElement *udpsrc0, *udpsrc1;
982   GstElement *udpsink0, *udpsink1;
983   GSocket *rtp_socket = NULL;
984   GSocket *rtcp_socket;
985   gint tmp_rtp, tmp_rtcp;
986   guint count;
987   gint rtpport, rtcpport;
988   GList *rejected_addresses = NULL;
989   GstRTSPAddress *addr = NULL;
990   GInetAddress *inetaddr = NULL;
991   GSocketAddress *rtp_sockaddr = NULL;
992   GSocketAddress *rtcp_sockaddr = NULL;
993   const gchar *multisink_socket;
994
995   if (family == G_SOCKET_FAMILY_IPV6)
996     multisink_socket = "socket-v6";
997   else
998     multisink_socket = "socket";
999
1000   udpsrc0 = NULL;
1001   udpsrc1 = NULL;
1002   udpsink0 = NULL;
1003   udpsink1 = NULL;
1004   count = 0;
1005
1006   /* Start with random port */
1007   tmp_rtp = 0;
1008
1009   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1010       G_SOCKET_PROTOCOL_UDP, NULL);
1011   if (!rtcp_socket)
1012     goto no_udp_protocol;
1013
1014   if (*server_addr_out)
1015     gst_rtsp_address_free (*server_addr_out);
1016
1017   /* try to allocate 2 UDP ports, the RTP port should be an even
1018    * number and the RTCP port should be the next (uneven) port */
1019 again:
1020
1021   if (rtp_socket == NULL) {
1022     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1023         G_SOCKET_PROTOCOL_UDP, NULL);
1024     if (!rtp_socket)
1025       goto no_udp_protocol;
1026   }
1027
1028   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1029     GstRTSPAddressFlags flags;
1030
1031     if (addr)
1032       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1033
1034     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1035     if (family == G_SOCKET_FAMILY_IPV6)
1036       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1037     else
1038       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1039
1040     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1041
1042     if (addr == NULL)
1043       goto no_ports;
1044
1045     tmp_rtp = addr->port;
1046
1047     g_clear_object (&inetaddr);
1048     inetaddr = g_inet_address_new_from_string (addr->address);
1049   } else {
1050     if (tmp_rtp != 0) {
1051       tmp_rtp += 2;
1052       if (++count > 20)
1053         goto no_ports;
1054     }
1055
1056     if (inetaddr == NULL)
1057       inetaddr = g_inet_address_new_any (family);
1058   }
1059
1060   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1061   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1062     g_object_unref (rtp_sockaddr);
1063     goto again;
1064   }
1065   g_object_unref (rtp_sockaddr);
1066
1067   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1068   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1069     g_clear_object (&rtp_sockaddr);
1070     goto socket_error;
1071   }
1072
1073   tmp_rtp =
1074       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1075   g_object_unref (rtp_sockaddr);
1076
1077   /* check if port is even */
1078   if ((tmp_rtp & 1) != 0) {
1079     /* port not even, close and allocate another */
1080     tmp_rtp++;
1081     g_clear_object (&rtp_socket);
1082     goto again;
1083   }
1084
1085   /* set port */
1086   tmp_rtcp = tmp_rtp + 1;
1087
1088   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1089   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1090     g_object_unref (rtcp_sockaddr);
1091     g_clear_object (&rtp_socket);
1092     goto again;
1093   }
1094   g_object_unref (rtcp_sockaddr);
1095
1096   g_clear_object (&inetaddr);
1097
1098   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1099   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1100
1101   if (udpsrc0 == NULL || udpsrc1 == NULL)
1102     goto no_udp_protocol;
1103
1104   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1105   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1106
1107   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1108   if (ret == GST_STATE_CHANGE_FAILURE)
1109     goto element_error;
1110   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1111   if (ret == GST_STATE_CHANGE_FAILURE)
1112     goto element_error;
1113
1114   /* all fine, do port check */
1115   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1116   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1117
1118   /* this should not happen... */
1119   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1120     goto port_error;
1121
1122   if (udpsink_out[0])
1123     udpsink0 = udpsink_out[0];
1124   else
1125     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1126
1127   if (!udpsink0)
1128     goto no_udp_protocol;
1129
1130   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1131   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1132
1133   if (udpsink_out[1])
1134     udpsink1 = udpsink_out[1];
1135   else
1136     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1137
1138   if (!udpsink1)
1139     goto no_udp_protocol;
1140
1141   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1142   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1143   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1144
1145   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1146   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1147   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1148   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1149   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1150   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1151   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1152   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1153
1154   /* we keep these elements, we will further configure them when the
1155    * client told us to really use the UDP ports. */
1156   udpsrc_out[0] = udpsrc0;
1157   udpsrc_out[1] = udpsrc1;
1158   udpsink_out[0] = udpsink0;
1159   udpsink_out[1] = udpsink1;
1160
1161   server_port_out->min = rtpport;
1162   server_port_out->max = rtcpport;
1163
1164   *server_addr_out = addr;
1165   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1166
1167   g_object_unref (rtp_socket);
1168   g_object_unref (rtcp_socket);
1169
1170   return TRUE;
1171
1172   /* ERRORS */
1173 no_udp_protocol:
1174   {
1175     goto cleanup;
1176   }
1177 no_ports:
1178   {
1179     goto cleanup;
1180   }
1181 port_error:
1182   {
1183     goto cleanup;
1184   }
1185 socket_error:
1186   {
1187     goto cleanup;
1188   }
1189 element_error:
1190   {
1191     goto cleanup;
1192   }
1193 cleanup:
1194   {
1195     if (udpsrc0) {
1196       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1197       gst_object_unref (udpsrc0);
1198     }
1199     if (udpsrc1) {
1200       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1201       gst_object_unref (udpsrc1);
1202     }
1203     if (udpsink0) {
1204       gst_element_set_state (udpsink0, GST_STATE_NULL);
1205       gst_object_unref (udpsink0);
1206     }
1207     if (inetaddr)
1208       g_object_unref (inetaddr);
1209     g_list_free_full (rejected_addresses,
1210         (GDestroyNotify) gst_rtsp_address_free);
1211     if (addr)
1212       gst_rtsp_address_free (addr);
1213     if (rtp_socket)
1214       g_object_unref (rtp_socket);
1215     if (rtcp_socket)
1216       g_object_unref (rtcp_socket);
1217     return FALSE;
1218   }
1219 }
1220
1221 /* must be called with lock */
1222 static gboolean
1223 alloc_ports (GstRTSPStream * stream)
1224 {
1225   GstRTSPStreamPrivate *priv = stream->priv;
1226
1227   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1228       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1229       &priv->server_port_v4, &priv->server_addr_v4);
1230
1231   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1232       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1233       &priv->server_port_v6, &priv->server_addr_v6);
1234
1235   return priv->have_ipv4 || priv->have_ipv6;
1236 }
1237
1238 /**
1239  * gst_rtsp_stream_get_server_port:
1240  * @stream: a #GstRTSPStream
1241  * @server_port: (out): result server port
1242  * @family: the port family to get
1243  *
1244  * Fill @server_port with the port pair used by the server. This function can
1245  * only be called when @stream has been joined.
1246  */
1247 void
1248 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1249     GstRTSPRange * server_port, GSocketFamily family)
1250 {
1251   GstRTSPStreamPrivate *priv;
1252
1253   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1254   priv = stream->priv;
1255   g_return_if_fail (priv->is_joined);
1256
1257   g_mutex_lock (&priv->lock);
1258   if (family == G_SOCKET_FAMILY_IPV4) {
1259     if (server_port)
1260       *server_port = priv->server_port_v4;
1261   } else {
1262     if (server_port)
1263       *server_port = priv->server_port_v6;
1264   }
1265   g_mutex_unlock (&priv->lock);
1266 }
1267
1268 /**
1269  * gst_rtsp_stream_get_rtpsession:
1270  * @stream: a #GstRTSPStream
1271  *
1272  * Get the RTP session of this stream.
1273  *
1274  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1275  */
1276 GObject *
1277 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1278 {
1279   GstRTSPStreamPrivate *priv;
1280   GObject *session;
1281
1282   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1283
1284   priv = stream->priv;
1285
1286   g_mutex_lock (&priv->lock);
1287   if ((session = priv->session))
1288     g_object_ref (session);
1289   g_mutex_unlock (&priv->lock);
1290
1291   return session;
1292 }
1293
1294 /**
1295  * gst_rtsp_stream_get_ssrc:
1296  * @stream: a #GstRTSPStream
1297  * @ssrc: (out): result ssrc
1298  *
1299  * Get the SSRC used by the RTP session of this stream. This function can only
1300  * be called when @stream has been joined.
1301  */
1302 void
1303 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1304 {
1305   GstRTSPStreamPrivate *priv;
1306
1307   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1308   priv = stream->priv;
1309   g_return_if_fail (priv->is_joined);
1310
1311   g_mutex_lock (&priv->lock);
1312   if (ssrc && priv->session)
1313     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1314   g_mutex_unlock (&priv->lock);
1315 }
1316
1317 /**
1318  * gst_rtsp_stream_set_retransmission_time:
1319  * @stream: a #GstRTSPStream
1320  * @time: a #GstClockTime
1321  *
1322  * Set the amount of time to store retransmission packets.
1323  */
1324 void
1325 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1326     GstClockTime time)
1327 {
1328   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1329
1330   g_mutex_lock (&stream->priv->lock);
1331   stream->priv->rtx_time = time;
1332   if (stream->priv->rtxsend)
1333     g_object_set (stream->priv->rtxsend, "max-size-time",
1334         GST_TIME_AS_MSECONDS (time), NULL);
1335   g_mutex_unlock (&stream->priv->lock);
1336 }
1337
1338 /**
1339  * gst_rtsp_media_get_retransmission_time:
1340  * @media: a #GstRTSPMedia
1341  *
1342  * Get the amount of time to store retransmission data.
1343  *
1344  * Returns: the amount of time to store retransmission data.
1345  */
1346 GstClockTime
1347 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1348 {
1349   GstClockTime ret;
1350
1351   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1352
1353   g_mutex_lock (&stream->priv->lock);
1354   ret = stream->priv->rtx_time;
1355   g_mutex_unlock (&stream->priv->lock);
1356
1357   return ret;
1358 }
1359
1360 void
1361 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1362 {
1363   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1364
1365   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1366
1367   g_mutex_lock (&stream->priv->lock);
1368   stream->priv->rtx_pt = rtx_pt;
1369   if (stream->priv->rtxsend) {
1370     guint pt = gst_rtsp_stream_get_pt (stream);
1371     gchar *pt_s = g_strdup_printf ("%d", pt);
1372     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1373         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1374     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1375     g_free (pt_s);
1376     gst_structure_free (rtx_pt_map);
1377   }
1378   g_mutex_unlock (&stream->priv->lock);
1379 }
1380
1381 guint
1382 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1383 {
1384   guint rtx_pt;
1385
1386   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1387
1388   g_mutex_lock (&stream->priv->lock);
1389   rtx_pt = stream->priv->rtx_pt;
1390   g_mutex_unlock (&stream->priv->lock);
1391
1392   return rtx_pt;
1393 }
1394
1395 /* executed from streaming thread */
1396 static void
1397 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1398 {
1399   GstRTSPStreamPrivate *priv = stream->priv;
1400   GstCaps *newcaps, *oldcaps;
1401
1402   newcaps = gst_pad_get_current_caps (pad);
1403
1404   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1405       newcaps);
1406
1407   g_mutex_lock (&priv->lock);
1408   oldcaps = priv->caps;
1409   priv->caps = newcaps;
1410   g_mutex_unlock (&priv->lock);
1411
1412   if (oldcaps)
1413     gst_caps_unref (oldcaps);
1414 }
1415
1416 static void
1417 dump_structure (const GstStructure * s)
1418 {
1419   gchar *sstr;
1420
1421   sstr = gst_structure_to_string (s);
1422   GST_INFO ("structure: %s", sstr);
1423   g_free (sstr);
1424 }
1425
1426 static GstRTSPStreamTransport *
1427 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1428 {
1429   GstRTSPStreamPrivate *priv = stream->priv;
1430   GList *walk;
1431   GstRTSPStreamTransport *result = NULL;
1432   const gchar *tmp;
1433   gchar *dest;
1434   guint port;
1435
1436   if (rtcp_from == NULL)
1437     return NULL;
1438
1439   tmp = g_strrstr (rtcp_from, ":");
1440   if (tmp == NULL)
1441     return NULL;
1442
1443   port = atoi (tmp + 1);
1444   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1445
1446   g_mutex_lock (&priv->lock);
1447   GST_INFO ("finding %s:%d in %d transports", dest, port,
1448       g_list_length (priv->transports));
1449
1450   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1451     GstRTSPStreamTransport *trans = walk->data;
1452     const GstRTSPTransport *tr;
1453     gint min, max;
1454
1455     tr = gst_rtsp_stream_transport_get_transport (trans);
1456
1457     min = tr->client_port.min;
1458     max = tr->client_port.max;
1459
1460     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1461       result = trans;
1462       break;
1463     }
1464   }
1465   if (result)
1466     g_object_ref (result);
1467   g_mutex_unlock (&priv->lock);
1468
1469   g_free (dest);
1470
1471   return result;
1472 }
1473
1474 static GstRTSPStreamTransport *
1475 check_transport (GObject * source, GstRTSPStream * stream)
1476 {
1477   GstStructure *stats;
1478   GstRTSPStreamTransport *trans;
1479
1480   /* see if we have a stream to match with the origin of the RTCP packet */
1481   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1482   if (trans == NULL) {
1483     g_object_get (source, "stats", &stats, NULL);
1484     if (stats) {
1485       const gchar *rtcp_from;
1486
1487       dump_structure (stats);
1488
1489       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1490       if ((trans = find_transport (stream, rtcp_from))) {
1491         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1492             source);
1493         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1494             g_object_unref);
1495       }
1496       gst_structure_free (stats);
1497     }
1498   }
1499   return trans;
1500 }
1501
1502
1503 static void
1504 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1505 {
1506   GstRTSPStreamTransport *trans;
1507
1508   GST_INFO ("%p: new source %p", stream, source);
1509
1510   trans = check_transport (source, stream);
1511
1512   if (trans)
1513     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1514 }
1515
1516 static void
1517 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1518 {
1519   GST_INFO ("%p: new SDES %p", stream, source);
1520 }
1521
1522 static void
1523 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1524 {
1525   GstRTSPStreamTransport *trans;
1526
1527   trans = check_transport (source, stream);
1528
1529   if (trans) {
1530     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1531     gst_rtsp_stream_transport_keep_alive (trans);
1532   }
1533 #ifdef DUMP_STATS
1534   {
1535     GstStructure *stats;
1536     g_object_get (source, "stats", &stats, NULL);
1537     if (stats) {
1538       dump_structure (stats);
1539       gst_structure_free (stats);
1540     }
1541   }
1542 #endif
1543 }
1544
1545 static void
1546 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1547 {
1548   GST_INFO ("%p: source %p bye", stream, source);
1549 }
1550
1551 static void
1552 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1553 {
1554   GstRTSPStreamTransport *trans;
1555
1556   GST_INFO ("%p: source %p bye timeout", stream, source);
1557
1558   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1559     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1560     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1561   }
1562 }
1563
1564 static void
1565 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1566 {
1567   GstRTSPStreamTransport *trans;
1568
1569   GST_INFO ("%p: source %p timeout", stream, source);
1570
1571   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1572     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1573     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1574   }
1575 }
1576
1577 static void
1578 clear_tr_cache (GstRTSPStreamPrivate * priv)
1579 {
1580   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1581   g_list_free (priv->tr_cache);
1582   priv->tr_cache = NULL;
1583 }
1584
1585 static GstFlowReturn
1586 handle_new_sample (GstAppSink * sink, gpointer user_data)
1587 {
1588   GstRTSPStreamPrivate *priv;
1589   GList *walk;
1590   GstSample *sample;
1591   GstBuffer *buffer;
1592   GstRTSPStream *stream;
1593   gboolean is_rtp;
1594
1595   sample = gst_app_sink_pull_sample (sink);
1596   if (!sample)
1597     return GST_FLOW_OK;
1598
1599   stream = (GstRTSPStream *) user_data;
1600   priv = stream->priv;
1601   buffer = gst_sample_get_buffer (sample);
1602
1603   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1604
1605   g_mutex_lock (&priv->lock);
1606   if (priv->tr_cache_cookie != priv->transports_cookie) {
1607     clear_tr_cache (priv);
1608     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1609       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1610       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1611     }
1612     priv->tr_cache_cookie = priv->transports_cookie;
1613   }
1614   g_mutex_unlock (&priv->lock);
1615
1616   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1617     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1618
1619     if (is_rtp) {
1620       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1621     } else {
1622       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1623     }
1624   }
1625   gst_sample_unref (sample);
1626
1627   return GST_FLOW_OK;
1628 }
1629
1630 static GstAppSinkCallbacks sink_cb = {
1631   NULL,                         /* not interested in EOS */
1632   NULL,                         /* not interested in preroll samples */
1633   handle_new_sample,
1634 };
1635
1636 static GstElement *
1637 get_rtp_encoder (GstRTSPStream * stream, guint session)
1638 {
1639   GstRTSPStreamPrivate *priv = stream->priv;
1640
1641   if (priv->srtpenc == NULL) {
1642     gchar *name;
1643
1644     name = g_strdup_printf ("srtpenc_%u", session);
1645     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1646     g_free (name);
1647
1648     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1649   }
1650   return gst_object_ref (priv->srtpenc);
1651 }
1652
1653 static GstElement *
1654 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1655 {
1656   GstRTSPStreamPrivate *priv = stream->priv;
1657   GstElement *oldenc, *enc;
1658   GstPad *pad;
1659   gchar *name;
1660
1661   if (priv->idx != session)
1662     return NULL;
1663
1664   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1665
1666   oldenc = priv->srtpenc;
1667   enc = get_rtp_encoder (stream, session);
1668   name = g_strdup_printf ("rtp_sink_%d", session);
1669   pad = gst_element_get_request_pad (enc, name);
1670   g_free (name);
1671   gst_object_unref (pad);
1672
1673   if (oldenc == NULL)
1674     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1675         enc);
1676
1677   return enc;
1678 }
1679
1680 static GstElement *
1681 request_rtcp_encoder (GstElement * rtpbin, guint session,
1682     GstRTSPStream * stream)
1683 {
1684   GstRTSPStreamPrivate *priv = stream->priv;
1685   GstElement *oldenc, *enc;
1686   GstPad *pad;
1687   gchar *name;
1688
1689   if (priv->idx != session)
1690     return NULL;
1691
1692   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1693
1694   oldenc = priv->srtpenc;
1695   enc = get_rtp_encoder (stream, session);
1696   name = g_strdup_printf ("rtcp_sink_%d", session);
1697   pad = gst_element_get_request_pad (enc, name);
1698   g_free (name);
1699   gst_object_unref (pad);
1700
1701   if (oldenc == NULL)
1702     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1703         enc);
1704
1705   return enc;
1706 }
1707
1708 static GstCaps *
1709 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1710 {
1711   GstRTSPStreamPrivate *priv = stream->priv;
1712   GstCaps *caps;
1713
1714   GST_DEBUG ("request key %08x", ssrc);
1715
1716   g_mutex_lock (&priv->lock);
1717   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1718     gst_caps_ref (caps);
1719   g_mutex_unlock (&priv->lock);
1720
1721   return caps;
1722 }
1723
1724 static GstElement *
1725 request_rtcp_decoder (GstElement * rtpbin, guint session,
1726     GstRTSPStream * stream)
1727 {
1728   GstRTSPStreamPrivate *priv = stream->priv;
1729
1730   if (priv->idx != session)
1731     return NULL;
1732
1733   if (priv->srtpdec == NULL) {
1734     gchar *name;
1735
1736     name = g_strdup_printf ("srtpdec_%u", session);
1737     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1738     g_free (name);
1739
1740     g_signal_connect (priv->srtpdec, "request-key",
1741         (GCallback) request_key, stream);
1742   }
1743   return gst_object_ref (priv->srtpdec);
1744 }
1745
1746 static GstElement *
1747 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
1748 {
1749   GstElement *bin;
1750   GstPad *pad;
1751   GstStructure *pt_map;
1752   gchar *name;
1753   guint pt, rtx_pt;
1754   gchar *pt_s;
1755
1756   pt = gst_rtsp_stream_get_pt (stream);
1757   pt_s = g_strdup_printf ("%u", pt);
1758   rtx_pt = stream->priv->rtx_pt;
1759
1760   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1761
1762   bin = gst_bin_new (NULL);
1763   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1764   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1765       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1766   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1767       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1768   g_free (pt_s);
1769   gst_structure_free (pt_map);
1770   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1771
1772   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1773   name = g_strdup_printf ("src_%u", sessid);
1774   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1775   g_free (name);
1776   gst_object_unref (pad);
1777
1778   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1779   name = g_strdup_printf ("sink_%u", sessid);
1780   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1781   g_free (name);
1782   gst_object_unref (pad);
1783
1784   return bin;
1785 }
1786
1787 /**
1788  * gst_rtsp_stream_join_bin:
1789  * @stream: a #GstRTSPStream
1790  * @bin: (transfer none): a #GstBin to join
1791  * @rtpbin: (transfer none): a rtpbin element in @bin
1792  * @state: the target state of the new elements
1793  *
1794  * Join the #GstBin @bin that contains the element @rtpbin.
1795  *
1796  * @stream will link to @rtpbin, which must be inside @bin. The elements
1797  * added to @bin will be set to the state given in @state.
1798  *
1799  * Returns: %TRUE on success.
1800  */
1801 gboolean
1802 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1803     GstElement * rtpbin, GstState state)
1804 {
1805   GstRTSPStreamPrivate *priv;
1806   gint i;
1807   guint idx;
1808   gchar *name;
1809   GstPad *pad, *sinkpad, *selpad;
1810   GstPadLinkReturn ret;
1811
1812   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1813   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1814   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1815
1816   priv = stream->priv;
1817
1818   g_mutex_lock (&priv->lock);
1819   if (priv->is_joined)
1820     goto was_joined;
1821
1822   /* create a session with the same index as the stream */
1823   idx = priv->idx;
1824
1825   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1826
1827   if (!alloc_ports (stream))
1828     goto no_ports;
1829
1830   /* update the dscp qos field in the sinks */
1831   update_dscp_qos (stream);
1832
1833   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1834       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1835     /* For SRTP */
1836     g_signal_connect (rtpbin, "request-rtp-encoder",
1837         (GCallback) request_rtp_encoder, stream);
1838     g_signal_connect (rtpbin, "request-rtcp-encoder",
1839         (GCallback) request_rtcp_encoder, stream);
1840     g_signal_connect (rtpbin, "request-rtcp-decoder",
1841         (GCallback) request_rtcp_decoder, stream);
1842   }
1843
1844   if (priv->rtx_time > 0) {
1845     /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
1846     g_signal_connect (rtpbin, "request-aux-sender",
1847         (GCallback) request_aux_sender, stream);
1848   }
1849
1850   /* get a pad for sending RTP */
1851   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1852   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1853   g_free (name);
1854   /* link the RTP pad to the session manager, it should not really fail unless
1855    * this is not really an RTP pad */
1856   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1857   if (ret != GST_PAD_LINK_OK)
1858     goto link_failed;
1859
1860   /* get pads from the RTP session element for sending and receiving
1861    * RTP/RTCP*/
1862   name = g_strdup_printf ("send_rtp_src_%u", idx);
1863   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1864   g_free (name);
1865   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1866   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1867   g_free (name);
1868   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1869   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1870   g_free (name);
1871   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1872   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1873   g_free (name);
1874
1875   /* get the session */
1876   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1877
1878   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1879       stream);
1880   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1881       stream);
1882   g_signal_connect (priv->session, "on-ssrc-active",
1883       (GCallback) on_ssrc_active, stream);
1884   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1885       stream);
1886   g_signal_connect (priv->session, "on-bye-timeout",
1887       (GCallback) on_bye_timeout, stream);
1888   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1889       stream);
1890
1891   for (i = 0; i < 2; i++) {
1892     GstPad *teepad, *queuepad;
1893     /* For the sender we create this bit of pipeline for both
1894      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1895      * we need to add a queue before appsink to make the pipeline
1896      * not block. For the TCP case, we want to pump data to the
1897      * client as fast as possible anyway.
1898      *
1899      * .--------.      .-----.    .---------.
1900      * | rtpbin |      | tee |    | udpsink |
1901      * |       send->sink   src->sink       |
1902      * '--------'      |     |    '---------'
1903      *                 |     |    .---------.    .---------.
1904      *                 |     |    |  queue  |    | appsink |
1905      *                 |    src->sink      src->sink       |
1906      *                 '-----'    '---------'    '---------'
1907      *
1908      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1909      * udpsink directly to the session.
1910      */
1911     /* add udpsink */
1912     gst_bin_add (bin, priv->udpsink[i]);
1913     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1914
1915     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1916       /* make tee for RTP/RTCP */
1917       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1918       gst_bin_add (bin, priv->tee[i]);
1919
1920       /* and link to rtpbin send pad */
1921       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1922       gst_pad_link (priv->send_src[i], pad);
1923       gst_object_unref (pad);
1924
1925       /* link tee to udpsink */
1926       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1927       gst_pad_link (teepad, sinkpad);
1928       gst_object_unref (teepad);
1929
1930       /* make queue */
1931       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1932       gst_bin_add (bin, priv->appqueue[i]);
1933       /* and link to tee */
1934       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1935       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1936       gst_pad_link (teepad, pad);
1937       gst_object_unref (pad);
1938       gst_object_unref (teepad);
1939
1940       /* make appsink */
1941       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1942       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1943       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1944       gst_bin_add (bin, priv->appsink[i]);
1945       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1946           &sink_cb, stream, NULL);
1947       /* and link to queue */
1948       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1949       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1950       gst_pad_link (queuepad, pad);
1951       gst_object_unref (pad);
1952       gst_object_unref (queuepad);
1953     } else {
1954       /* else only udpsink needed, link it to the session */
1955       gst_pad_link (priv->send_src[i], sinkpad);
1956     }
1957     gst_object_unref (sinkpad);
1958
1959     /* For the receiver we create this bit of pipeline for both
1960      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1961      * and it is all funneled into the rtpbin receive pad.
1962      *
1963      * .--------.     .--------.    .--------.
1964      * | udpsrc |     | funnel |    | rtpbin |
1965      * |       src->sink      src->sink      |
1966      * '--------'     |        |    '--------'
1967      * .--------.     |        |
1968      * | appsrc |     |        |
1969      * |       src->sink       |
1970      * '--------'     '--------'
1971      */
1972     /* make funnel for the RTP/RTCP receivers */
1973     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1974     gst_bin_add (bin, priv->funnel[i]);
1975
1976     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1977     gst_pad_link (pad, priv->recv_sink[i]);
1978     gst_object_unref (pad);
1979
1980     if (priv->udpsrc_v4[i]) {
1981       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1982        * values */
1983       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1984       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1985       /* add udpsrc */
1986       gst_bin_add (bin, priv->udpsrc_v4[i]);
1987
1988       /* and link to the funnel v4 */
1989       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1990       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1991       gst_pad_link (pad, selpad);
1992       gst_object_unref (pad);
1993       gst_object_unref (selpad);
1994     }
1995
1996     if (priv->udpsrc_v6[i]) {
1997       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1998       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1999       gst_bin_add (bin, priv->udpsrc_v6[i]);
2000
2001       /* and link to the funnel v6 */
2002       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2003       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2004       gst_pad_link (pad, selpad);
2005       gst_object_unref (pad);
2006       gst_object_unref (selpad);
2007     }
2008
2009     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2010       /* make and add appsrc */
2011       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2012       gst_bin_add (bin, priv->appsrc[i]);
2013       /* and link to the funnel */
2014       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2015       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2016       gst_pad_link (pad, selpad);
2017       gst_object_unref (pad);
2018       gst_object_unref (selpad);
2019     }
2020
2021     /* check if we need to set to a special state */
2022     if (state != GST_STATE_NULL) {
2023       if (priv->udpsink[i])
2024         gst_element_set_state (priv->udpsink[i], state);
2025       if (priv->appsink[i])
2026         gst_element_set_state (priv->appsink[i], state);
2027       if (priv->appqueue[i])
2028         gst_element_set_state (priv->appqueue[i], state);
2029       if (priv->tee[i])
2030         gst_element_set_state (priv->tee[i], state);
2031       if (priv->funnel[i])
2032         gst_element_set_state (priv->funnel[i], state);
2033       if (priv->appsrc[i])
2034         gst_element_set_state (priv->appsrc[i], state);
2035     }
2036   }
2037
2038   /* be notified of caps changes */
2039   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2040       (GCallback) caps_notify, stream);
2041
2042   priv->is_joined = TRUE;
2043   g_mutex_unlock (&priv->lock);
2044
2045   return TRUE;
2046
2047   /* ERRORS */
2048 was_joined:
2049   {
2050     g_mutex_unlock (&priv->lock);
2051     return TRUE;
2052   }
2053 no_ports:
2054   {
2055     g_mutex_unlock (&priv->lock);
2056     GST_WARNING ("failed to allocate ports %u", idx);
2057     return FALSE;
2058   }
2059 link_failed:
2060   {
2061     GST_WARNING ("failed to link stream %u", idx);
2062     gst_object_unref (priv->send_rtp_sink);
2063     priv->send_rtp_sink = NULL;
2064     g_mutex_unlock (&priv->lock);
2065     return FALSE;
2066   }
2067 }
2068
2069 /**
2070  * gst_rtsp_stream_leave_bin:
2071  * @stream: a #GstRTSPStream
2072  * @bin: (transfer none): a #GstBin
2073  * @rtpbin: (transfer none): a rtpbin #GstElement
2074  *
2075  * Remove the elements of @stream from @bin.
2076  *
2077  * Return: %TRUE on success.
2078  */
2079 gboolean
2080 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2081     GstElement * rtpbin)
2082 {
2083   GstRTSPStreamPrivate *priv;
2084   gint i;
2085   GList *l;
2086
2087   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2088   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2089   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2090
2091   priv = stream->priv;
2092
2093   g_mutex_lock (&priv->lock);
2094   if (!priv->is_joined)
2095     goto was_not_joined;
2096
2097   /* all transports must be removed by now */
2098   if (priv->transports != NULL)
2099     goto transports_not_removed;
2100
2101   clear_tr_cache (priv);
2102
2103   GST_INFO ("stream %p leaving bin", stream);
2104
2105   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2106   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2107   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2108   gst_object_unref (priv->send_rtp_sink);
2109   priv->send_rtp_sink = NULL;
2110
2111   for (i = 0; i < 2; i++) {
2112     if (priv->udpsink[i])
2113       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2114     if (priv->appsink[i])
2115       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2116     if (priv->appqueue[i])
2117       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2118     if (priv->tee[i])
2119       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2120     if (priv->funnel[i])
2121       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2122     if (priv->appsrc[i])
2123       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2124     if (priv->udpsrc_v4[i]) {
2125       /* and set udpsrc to NULL now before removing */
2126       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2127       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2128       /* removing them should also nicely release the request
2129        * pads when they finalize */
2130       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2131     }
2132     if (priv->udpsrc_v6[i]) {
2133       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2134       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2135       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2136     }
2137
2138     for (l = priv->transport_sources; l; l = l->next) {
2139       GstRTSPMulticastTransportSource *s = l->data;
2140
2141       if (!s->udpsrc[i])
2142         continue;
2143
2144       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2145       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2146       gst_bin_remove (bin, s->udpsrc[i]);
2147     }
2148
2149     if (priv->udpsink[i])
2150       gst_bin_remove (bin, priv->udpsink[i]);
2151     if (priv->appsrc[i])
2152       gst_bin_remove (bin, priv->appsrc[i]);
2153     if (priv->appsink[i])
2154       gst_bin_remove (bin, priv->appsink[i]);
2155     if (priv->appqueue[i])
2156       gst_bin_remove (bin, priv->appqueue[i]);
2157     if (priv->tee[i])
2158       gst_bin_remove (bin, priv->tee[i]);
2159     if (priv->funnel[i])
2160       gst_bin_remove (bin, priv->funnel[i]);
2161
2162     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2163     gst_object_unref (priv->recv_sink[i]);
2164     priv->recv_sink[i] = NULL;
2165
2166     priv->udpsrc_v4[i] = NULL;
2167     priv->udpsrc_v6[i] = NULL;
2168     priv->udpsink[i] = NULL;
2169     priv->appsrc[i] = NULL;
2170     priv->appsink[i] = NULL;
2171     priv->appqueue[i] = NULL;
2172     priv->tee[i] = NULL;
2173     priv->funnel[i] = NULL;
2174   }
2175
2176   for (l = priv->transport_sources; l; l = l->next) {
2177     GstRTSPMulticastTransportSource *s = l->data;
2178     g_slice_free (GstRTSPMulticastTransportSource, s);
2179   }
2180   g_list_free (priv->transport_sources);
2181   priv->transport_sources = NULL;
2182
2183   gst_object_unref (priv->send_src[0]);
2184   priv->send_src[0] = NULL;
2185
2186   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2187   gst_object_unref (priv->send_src[1]);
2188   priv->send_src[1] = NULL;
2189
2190   g_object_unref (priv->session);
2191   priv->session = NULL;
2192   if (priv->caps)
2193     gst_caps_unref (priv->caps);
2194   priv->caps = NULL;
2195
2196   if (priv->srtpenc)
2197     gst_object_unref (priv->srtpenc);
2198   if (priv->srtpdec)
2199     gst_object_unref (priv->srtpdec);
2200
2201   priv->is_joined = FALSE;
2202   g_mutex_unlock (&priv->lock);
2203
2204   return TRUE;
2205
2206 was_not_joined:
2207   {
2208     g_mutex_unlock (&priv->lock);
2209     return TRUE;
2210   }
2211 transports_not_removed:
2212   {
2213     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2214     g_mutex_unlock (&priv->lock);
2215     return FALSE;
2216   }
2217 }
2218
2219 /**
2220  * gst_rtsp_stream_get_rtpinfo:
2221  * @stream: a #GstRTSPStream
2222  * @rtptime: (allow-none): result RTP timestamp
2223  * @seq: (allow-none): result RTP seqnum
2224  * @clock_rate: (allow-none): the clock rate
2225  * @running_time: (allow-none): result running-time
2226  *
2227  * Retrieve the current rtptime, seq and running-time. This is used to
2228  * construct a RTPInfo reply header.
2229  *
2230  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2231  */
2232 gboolean
2233 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2234     guint * rtptime, guint * seq, guint * clock_rate,
2235     GstClockTime * running_time)
2236 {
2237   GstRTSPStreamPrivate *priv;
2238   GstStructure *stats;
2239   GObjectClass *payobjclass;
2240
2241   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2242
2243   priv = stream->priv;
2244
2245   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2246
2247   g_mutex_lock (&priv->lock);
2248
2249   if (g_object_class_find_property (payobjclass, "stats")) {
2250     g_object_get (priv->payloader, "stats", &stats, NULL);
2251     if (stats == NULL)
2252       goto no_stats;
2253
2254     if (seq)
2255       gst_structure_get_uint (stats, "seqnum", seq);
2256
2257     if (rtptime)
2258       gst_structure_get_uint (stats, "timestamp", rtptime);
2259
2260     if (running_time)
2261       gst_structure_get_clock_time (stats, "running-time", running_time);
2262
2263     if (clock_rate) {
2264       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2265       if (*clock_rate == 0 && running_time)
2266         *running_time = GST_CLOCK_TIME_NONE;
2267     }
2268     gst_structure_free (stats);
2269   } else {
2270     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2271         !g_object_class_find_property (payobjclass, "timestamp"))
2272       goto no_stats;
2273
2274     if (seq)
2275       g_object_get (priv->payloader, "seqnum", seq, NULL);
2276
2277     if (rtptime)
2278       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2279
2280     if (running_time)
2281       *running_time = GST_CLOCK_TIME_NONE;
2282   }
2283   g_mutex_unlock (&priv->lock);
2284
2285   return TRUE;
2286
2287   /* ERRORS */
2288 no_stats:
2289   {
2290     GST_WARNING ("Could not get payloader stats");
2291     g_mutex_unlock (&priv->lock);
2292     return FALSE;
2293   }
2294 }
2295
2296 /**
2297  * gst_rtsp_stream_get_caps:
2298  * @stream: a #GstRTSPStream
2299  *
2300  * Retrieve the current caps of @stream.
2301  *
2302  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2303  * after usage.
2304  */
2305 GstCaps *
2306 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2307 {
2308   GstRTSPStreamPrivate *priv;
2309   GstCaps *result;
2310
2311   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2312
2313   priv = stream->priv;
2314
2315   g_mutex_lock (&priv->lock);
2316   if ((result = priv->caps))
2317     gst_caps_ref (result);
2318   g_mutex_unlock (&priv->lock);
2319
2320   return result;
2321 }
2322
2323 /**
2324  * gst_rtsp_stream_recv_rtp:
2325  * @stream: a #GstRTSPStream
2326  * @buffer: (transfer full): a #GstBuffer
2327  *
2328  * Handle an RTP buffer for the stream. This method is usually called when a
2329  * message has been received from a client using the TCP transport.
2330  *
2331  * This function takes ownership of @buffer.
2332  *
2333  * Returns: a GstFlowReturn.
2334  */
2335 GstFlowReturn
2336 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2337 {
2338   GstRTSPStreamPrivate *priv;
2339   GstFlowReturn ret;
2340   GstElement *element;
2341
2342   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2343   priv = stream->priv;
2344   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2345   g_return_val_if_fail (priv->is_joined, FALSE);
2346
2347   g_mutex_lock (&priv->lock);
2348   if (priv->appsrc[0])
2349     element = gst_object_ref (priv->appsrc[0]);
2350   else
2351     element = NULL;
2352   g_mutex_unlock (&priv->lock);
2353
2354   if (element) {
2355     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2356     gst_object_unref (element);
2357   } else {
2358     ret = GST_FLOW_OK;
2359   }
2360   return ret;
2361 }
2362
2363 /**
2364  * gst_rtsp_stream_recv_rtcp:
2365  * @stream: a #GstRTSPStream
2366  * @buffer: (transfer full): a #GstBuffer
2367  *
2368  * Handle an RTCP buffer for the stream. This method is usually called when a
2369  * message has been received from a client using the TCP transport.
2370  *
2371  * This function takes ownership of @buffer.
2372  *
2373  * Returns: a GstFlowReturn.
2374  */
2375 GstFlowReturn
2376 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2377 {
2378   GstRTSPStreamPrivate *priv;
2379   GstFlowReturn ret;
2380   GstElement *element;
2381
2382   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2383   priv = stream->priv;
2384   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2385
2386   if (!priv->is_joined) {
2387     gst_buffer_unref (buffer);
2388     return GST_FLOW_NOT_LINKED;
2389   }
2390   g_mutex_lock (&priv->lock);
2391   if (priv->appsrc[1])
2392     element = gst_object_ref (priv->appsrc[1]);
2393   else
2394     element = NULL;
2395   g_mutex_unlock (&priv->lock);
2396
2397   if (element) {
2398     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2399     gst_object_unref (element);
2400   } else {
2401     ret = GST_FLOW_OK;
2402     gst_buffer_unref (buffer);
2403   }
2404   return ret;
2405 }
2406
2407 /* must be called with lock */
2408 static gboolean
2409 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2410     gboolean add)
2411 {
2412   GstRTSPStreamPrivate *priv = stream->priv;
2413   const GstRTSPTransport *tr;
2414
2415   tr = gst_rtsp_stream_transport_get_transport (trans);
2416
2417   switch (tr->lower_transport) {
2418     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2419     {
2420       GstRTSPMulticastTransportSource *source;
2421       GstBin *bin;
2422
2423       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2424
2425       if (add) {
2426         gchar *host;
2427         gint i;
2428         GstPad *selpad, *pad;
2429
2430         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2431         source->transport = trans;
2432
2433         for (i = 0; i < 2; i++) {
2434           host =
2435               g_strdup_printf ("udp://%s:%d", tr->destination,
2436               (i == 0) ? tr->port.min : tr->port.max);
2437           source->udpsrc[i] =
2438               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2439           g_free (host);
2440
2441           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2442            * values */
2443           gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2444           gst_element_set_locked_state (source->udpsrc[i], TRUE);
2445           /* add udpsrc */
2446           gst_bin_add (bin, source->udpsrc[i]);
2447
2448           /* and link to the funnel v4 */
2449           source->selpad[i] = selpad =
2450               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2451           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2452           gst_pad_link (pad, selpad);
2453           gst_object_unref (pad);
2454           gst_object_unref (selpad);
2455         }
2456         gst_object_unref (bin);
2457
2458         priv->transport_sources =
2459             g_list_prepend (priv->transport_sources, source);
2460       } else {
2461         GList *l;
2462
2463         for (l = priv->transport_sources; l; l = l->next) {
2464           source = l->data;
2465
2466           if (source->transport == trans) {
2467             priv->transport_sources =
2468                 g_list_delete_link (priv->transport_sources, l);
2469             break;
2470           }
2471         }
2472
2473         if (l != NULL) {
2474           gint i;
2475
2476           for (i = 0; i < 2; i++) {
2477             /* Will automatically unlink everything */
2478             gst_bin_remove (bin,
2479                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2480
2481             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2482             gst_object_unref (source->udpsrc[i]);
2483
2484             gst_element_release_request_pad (priv->funnel[i],
2485                 source->selpad[i]);
2486           }
2487
2488           g_slice_free (GstRTSPMulticastTransportSource, source);
2489         }
2490       }
2491
2492       /* fall through for the generic case */
2493     }
2494     case GST_RTSP_LOWER_TRANS_UDP:
2495     {
2496       gchar *dest;
2497       gint min, max;
2498       guint ttl = 0;
2499
2500       dest = tr->destination;
2501       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2502         min = tr->port.min;
2503         max = tr->port.max;
2504         ttl = tr->ttl;
2505       } else {
2506         min = tr->client_port.min;
2507         max = tr->client_port.max;
2508       }
2509
2510       if (add) {
2511         if (ttl > 0) {
2512           GST_INFO ("setting ttl-mc %d", ttl);
2513           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2514           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2515         }
2516         GST_INFO ("adding %s:%d-%d", dest, min, max);
2517         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2518         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2519         priv->transports = g_list_prepend (priv->transports, trans);
2520       } else {
2521         GST_INFO ("removing %s:%d-%d", dest, min, max);
2522         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2523         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2524         priv->transports = g_list_remove (priv->transports, trans);
2525       }
2526       priv->transports_cookie++;
2527       break;
2528     }
2529     case GST_RTSP_LOWER_TRANS_TCP:
2530       if (add) {
2531         GST_INFO ("adding TCP %s", tr->destination);
2532         priv->transports = g_list_prepend (priv->transports, trans);
2533       } else {
2534         GST_INFO ("removing TCP %s", tr->destination);
2535         priv->transports = g_list_remove (priv->transports, trans);
2536       }
2537       priv->transports_cookie++;
2538       break;
2539     default:
2540       goto unknown_transport;
2541   }
2542   return TRUE;
2543
2544   /* ERRORS */
2545 unknown_transport:
2546   {
2547     GST_INFO ("Unknown transport %d", tr->lower_transport);
2548     return FALSE;
2549   }
2550 }
2551
2552
2553 /**
2554  * gst_rtsp_stream_add_transport:
2555  * @stream: a #GstRTSPStream
2556  * @trans: (transfer none): a #GstRTSPStreamTransport
2557  *
2558  * Add the transport in @trans to @stream. The media of @stream will
2559  * then also be send to the values configured in @trans.
2560  *
2561  * @stream must be joined to a bin.
2562  *
2563  * @trans must contain a valid #GstRTSPTransport.
2564  *
2565  * Returns: %TRUE if @trans was added
2566  */
2567 gboolean
2568 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2569     GstRTSPStreamTransport * trans)
2570 {
2571   GstRTSPStreamPrivate *priv;
2572   gboolean res;
2573
2574   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2575   priv = stream->priv;
2576   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2577   g_return_val_if_fail (priv->is_joined, FALSE);
2578
2579   g_mutex_lock (&priv->lock);
2580   res = update_transport (stream, trans, TRUE);
2581   g_mutex_unlock (&priv->lock);
2582
2583   return res;
2584 }
2585
2586 /**
2587  * gst_rtsp_stream_remove_transport:
2588  * @stream: a #GstRTSPStream
2589  * @trans: (transfer none): a #GstRTSPStreamTransport
2590  *
2591  * Remove the transport in @trans from @stream. The media of @stream will
2592  * not be sent to the values configured in @trans.
2593  *
2594  * @stream must be joined to a bin.
2595  *
2596  * @trans must contain a valid #GstRTSPTransport.
2597  *
2598  * Returns: %TRUE if @trans was removed
2599  */
2600 gboolean
2601 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2602     GstRTSPStreamTransport * trans)
2603 {
2604   GstRTSPStreamPrivate *priv;
2605   gboolean res;
2606
2607   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2608   priv = stream->priv;
2609   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2610   g_return_val_if_fail (priv->is_joined, FALSE);
2611
2612   g_mutex_lock (&priv->lock);
2613   res = update_transport (stream, trans, FALSE);
2614   g_mutex_unlock (&priv->lock);
2615
2616   return res;
2617 }
2618
2619 /**
2620  * gst_rtsp_stream_update_crypto:
2621  * @stream: a #GstRTSPStream
2622  * @ssrc: the SSRC
2623  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2624  *
2625  * Update the new crypto information for @ssrc in @stream. If information
2626  * for @ssrc did not exist, it will be added. If information
2627  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2628  * be removed from @stream.
2629  *
2630  * Returns: %TRUE if @crypto could be updated
2631  */
2632 gboolean
2633 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2634     guint ssrc, GstCaps * crypto)
2635 {
2636   GstRTSPStreamPrivate *priv;
2637
2638   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2639   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2640
2641   priv = stream->priv;
2642
2643   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2644
2645   g_mutex_lock (&priv->lock);
2646   if (crypto)
2647     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2648         gst_caps_ref (crypto));
2649   else
2650     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2651   g_mutex_unlock (&priv->lock);
2652
2653   return TRUE;
2654 }
2655
2656 /**
2657  * gst_rtsp_stream_get_rtp_socket:
2658  * @stream: a #GstRTSPStream
2659  * @family: the socket family
2660  *
2661  * Get the RTP socket from @stream for a @family.
2662  *
2663  * @stream must be joined to a bin.
2664  *
2665  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2666  * socket could be allocated for @family. Unref after usage
2667  */
2668 GSocket *
2669 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2670 {
2671   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2672   GSocket *socket;
2673   const gchar *name;
2674
2675   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2676   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2677       family == G_SOCKET_FAMILY_IPV6, NULL);
2678   g_return_val_if_fail (priv->udpsink[0], NULL);
2679
2680   if (family == G_SOCKET_FAMILY_IPV6)
2681     name = "socket-v6";
2682   else
2683     name = "socket";
2684
2685   g_object_get (priv->udpsink[0], name, &socket, NULL);
2686
2687   return socket;
2688 }
2689
2690 /**
2691  * gst_rtsp_stream_get_rtcp_socket:
2692  * @stream: a #GstRTSPStream
2693  * @family: the socket family
2694  *
2695  * Get the RTCP socket from @stream for a @family.
2696  *
2697  * @stream must be joined to a bin.
2698  *
2699  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2700  * socket could be allocated for @family. Unref after usage
2701  */
2702 GSocket *
2703 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2704 {
2705   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2706   GSocket *socket;
2707   const gchar *name;
2708
2709   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2710   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2711       family == G_SOCKET_FAMILY_IPV6, NULL);
2712   g_return_val_if_fail (priv->udpsink[1], NULL);
2713
2714   if (family == G_SOCKET_FAMILY_IPV6)
2715     name = "socket-v6";
2716   else
2717     name = "socket";
2718
2719   g_object_get (priv->udpsink[1], name, &socket, NULL);
2720
2721   return socket;
2722 }
2723
2724 /**
2725  * gst_rtsp_stream_set_seqnum:
2726  * @stream: a #GstRTSPStream
2727  * @seqnum: a new sequence number
2728  *
2729  * Configure the sequence number in the payloader of @stream to @seqnum.
2730  */
2731 void
2732 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
2733 {
2734   GstRTSPStreamPrivate *priv;
2735
2736   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2737
2738   priv = stream->priv;
2739
2740   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
2741 }
2742
2743 /**
2744  * gst_rtsp_stream_get_seqnum:
2745  * @stream: a #GstRTSPStream
2746  *
2747  * Get the configured sequence number in the payloader of @stream.
2748  *
2749  * Returns: the sequence number of the payloader.
2750  */
2751 guint16
2752 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
2753 {
2754   GstRTSPStreamPrivate *priv;
2755   guint seqnum;
2756
2757   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2758
2759   priv = stream->priv;
2760
2761   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
2762
2763   return seqnum;
2764 }
2765
2766 /**
2767  * gst_rtsp_stream_transport_filter:
2768  * @stream: a #GstRTSPStream
2769  * @func: (scope call) (allow-none): a callback
2770  * @user_data: (closure): user data passed to @func
2771  *
2772  * Call @func for each transport managed by @stream. The result value of @func
2773  * determines what happens to the transport. @func will be called with @stream
2774  * locked so no further actions on @stream can be performed from @func.
2775  *
2776  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2777  * @stream.
2778  *
2779  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2780  *
2781  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2782  * will also be added with an additional ref to the result #GList of this
2783  * function..
2784  *
2785  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2786  *
2787  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2788  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2789  * element in the #GList should be unreffed before the list is freed.
2790  */
2791 GList *
2792 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2793     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2794 {
2795   GstRTSPStreamPrivate *priv;
2796   GList *result, *walk, *next;
2797   GHashTable *visited;
2798   guint cookie;
2799
2800   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2801
2802   priv = stream->priv;
2803
2804   result = NULL;
2805   if (func)
2806     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
2807
2808   g_mutex_lock (&priv->lock);
2809 restart:
2810   cookie = priv->transports_cookie;
2811   for (walk = priv->transports; walk; walk = next) {
2812     GstRTSPStreamTransport *trans = walk->data;
2813     GstRTSPFilterResult res;
2814     gboolean changed;
2815
2816     next = g_list_next (walk);
2817
2818     if (func) {
2819       /* only visit each transport once */
2820       if (g_hash_table_contains (visited, trans))
2821         continue;
2822
2823       g_hash_table_add (visited, g_object_ref (trans));
2824       g_mutex_unlock (&priv->lock);
2825
2826       res = func (stream, trans, user_data);
2827
2828       g_mutex_lock (&priv->lock);
2829     } else
2830       res = GST_RTSP_FILTER_REF;
2831
2832     changed = (cookie != priv->transports_cookie);
2833
2834     switch (res) {
2835       case GST_RTSP_FILTER_REMOVE:
2836         update_transport (stream, trans, FALSE);
2837         break;
2838       case GST_RTSP_FILTER_REF:
2839         result = g_list_prepend (result, g_object_ref (trans));
2840         break;
2841       case GST_RTSP_FILTER_KEEP:
2842       default:
2843         break;
2844     }
2845     if (changed)
2846       goto restart;
2847   }
2848   g_mutex_unlock (&priv->lock);
2849
2850   if (func)
2851     g_hash_table_unref (visited);
2852
2853   return result;
2854 }
2855
2856 static GstPadProbeReturn
2857 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2858 {
2859   GstRTSPStreamPrivate *priv;
2860   GstRTSPStream *stream;
2861
2862   stream = user_data;
2863   priv = stream->priv;
2864
2865   GST_DEBUG_OBJECT (pad, "now blocking");
2866
2867   g_mutex_lock (&priv->lock);
2868   priv->blocking = TRUE;
2869   g_mutex_unlock (&priv->lock);
2870
2871   gst_element_post_message (priv->payloader,
2872       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2873           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2874
2875   return GST_PAD_PROBE_OK;
2876 }
2877
2878 /**
2879  * gst_rtsp_stream_set_blocked:
2880  * @stream: a #GstRTSPStream
2881  * @blocked: boolean indicating we should block or unblock
2882  *
2883  * Blocks or unblocks the dataflow on @stream.
2884  *
2885  * Returns: %TRUE on success
2886  */
2887 gboolean
2888 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2889 {
2890   GstRTSPStreamPrivate *priv;
2891
2892   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2893
2894   priv = stream->priv;
2895
2896   g_mutex_lock (&priv->lock);
2897   if (blocked) {
2898     priv->blocking = FALSE;
2899     if (priv->blocked_id == 0) {
2900       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2901           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2902           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2903           g_object_ref (stream), g_object_unref);
2904     }
2905   } else {
2906     if (priv->blocked_id != 0) {
2907       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2908       priv->blocked_id = 0;
2909       priv->blocking = FALSE;
2910     }
2911   }
2912   g_mutex_unlock (&priv->lock);
2913
2914   return TRUE;
2915 }
2916
2917 /**
2918  * gst_rtsp_stream_is_blocking:
2919  * @stream: a #GstRTSPStream
2920  *
2921  * Check if @stream is blocking on a #GstBuffer.
2922  *
2923  * Returns: %TRUE if @stream is blocking
2924  */
2925 gboolean
2926 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2927 {
2928   GstRTSPStreamPrivate *priv;
2929   gboolean result;
2930
2931   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2932
2933   priv = stream->priv;
2934
2935   g_mutex_lock (&priv->lock);
2936   result = priv->blocking;
2937   g_mutex_unlock (&priv->lock);
2938
2939   return result;
2940 }
2941
2942 /**
2943  * gst_rtsp_stream_query_position:
2944  * @stream: a #GstRTSPStream
2945  *
2946  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
2947  * the RTP parts of the pipeline and not the RTCP parts.
2948  *
2949  * Returns: %TRUE if the position could be queried
2950  */
2951 gboolean
2952 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
2953 {
2954   GstRTSPStreamPrivate *priv;
2955   GstElement *sink;
2956   gboolean ret;
2957
2958   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2959
2960   priv = stream->priv;
2961
2962   g_mutex_lock (&priv->lock);
2963   if ((sink = priv->udpsink[0]))
2964     gst_object_ref (sink);
2965   g_mutex_unlock (&priv->lock);
2966
2967   if (!sink)
2968     return FALSE;
2969
2970   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
2971   gst_object_unref (sink);
2972
2973   return ret;
2974 }
2975
2976 /**
2977  * gst_rtsp_stream_query_stop:
2978  * @stream: a #GstRTSPStream
2979  *
2980  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
2981  * the RTP parts of the pipeline and not the RTCP parts.
2982  *
2983  * Returns: %TRUE if the stop could be queried
2984  */
2985 gboolean
2986 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
2987 {
2988   GstRTSPStreamPrivate *priv;
2989   GstElement *sink;
2990   GstQuery *query;
2991   gboolean ret;
2992
2993   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2994
2995   priv = stream->priv;
2996
2997   g_mutex_lock (&priv->lock);
2998   if ((sink = priv->udpsink[0]))
2999     gst_object_ref (sink);
3000   g_mutex_unlock (&priv->lock);
3001
3002   if (!sink)
3003     return FALSE;
3004
3005   query = gst_query_new_segment (GST_FORMAT_TIME);
3006   if ((ret = gst_element_query (sink, query))) {
3007     GstFormat format;
3008
3009     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3010     if (format != GST_FORMAT_TIME)
3011       *stop = -1;
3012   }
3013   gst_query_unref (query);
3014   gst_object_unref (sink);
3015
3016   return ret;
3017
3018 }