rtsp-stream: Fix compiler warnings
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 typedef struct
62 {
63   GstRTSPStreamTransport *transport;
64
65   /* RTP and RTCP source */
66   GstElement *udpsrc[2];
67   GstPad *selpad[2];
68 } GstRTSPMulticastTransportSource;
69
70 struct _GstRTSPStreamPrivate
71 {
72   GMutex lock;
73   guint idx;
74   GstPad *srcpad;
75   GstElement *payloader;
76   guint buffer_size;
77   gboolean is_joined;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_sink[2];
86   GstPad *send_src[2];
87
88   /* the RTPSession object */
89   GObject *session;
90
91   /* SRTP encoder/decoder */
92   GstElement *srtpenc;
93   GstElement *srtpdec;
94   GHashTable *keys;
95
96   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
97    * sockets */
98   GstElement *udpsrc_v4[2];
99
100   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
101    * sockets */
102   GstElement *udpsrc_v6[2];
103
104   GstElement *udpsink[2];
105
106   /* for TCP transport */
107   GstElement *appsrc[2];
108   GstElement *appqueue[2];
109   GstElement *appsink[2];
110
111   GstElement *tee[2];
112   GstElement *funnel[2];
113
114   /* retransmission */
115   GstElement *rtxsend;
116   guint rtx_pt;
117   GstClockTime rtx_time;
118
119   /* server ports for sending/receiving over ipv4 */
120   GstRTSPRange server_port_v4;
121   GstRTSPAddress *server_addr_v4;
122   gboolean have_ipv4;
123
124   /* server ports for sending/receiving over ipv6 */
125   GstRTSPRange server_port_v6;
126   GstRTSPAddress *server_addr_v6;
127   gboolean have_ipv6;
128
129   /* multicast addresses */
130   GstRTSPAddressPool *pool;
131   GstRTSPAddress *addr_v4;
132   GstRTSPAddress *addr_v6;
133
134   /* the caps of the stream */
135   gulong caps_sig;
136   GstCaps *caps;
137
138   /* transports we stream to */
139   guint n_active;
140   GList *transports;
141   guint transports_cookie;
142   GList *tr_cache;
143   guint tr_cache_cookie;
144
145   /* UDP sources for UDP multicast transports */
146   GList *transport_sources;
147
148   gint dscp_qos;
149
150   /* stream blocking */
151   gulong blocked_id;
152   gboolean blocking;
153 };
154
155 #define DEFAULT_CONTROL         NULL
156 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
157 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
158                                         GST_RTSP_LOWER_TRANS_TCP
159
160 enum
161 {
162   PROP_0,
163   PROP_CONTROL,
164   PROP_PROFILES,
165   PROP_PROTOCOLS,
166   PROP_LAST
167 };
168
169 enum
170 {
171   SIGNAL_NEW_RTP_ENCODER,
172   SIGNAL_NEW_RTCP_ENCODER,
173   SIGNAL_LAST
174 };
175
176 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
177 #define GST_CAT_DEFAULT rtsp_stream_debug
178
179 static GQuark ssrc_stream_map_key;
180
181 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
182     GValue * value, GParamSpec * pspec);
183 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
184     const GValue * value, GParamSpec * pspec);
185
186 static void gst_rtsp_stream_finalize (GObject * obj);
187
188 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
189
190 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
191
192 static void
193 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
194 {
195   GObjectClass *gobject_class;
196
197   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
198
199   gobject_class = G_OBJECT_CLASS (klass);
200
201   gobject_class->get_property = gst_rtsp_stream_get_property;
202   gobject_class->set_property = gst_rtsp_stream_set_property;
203   gobject_class->finalize = gst_rtsp_stream_finalize;
204
205   g_object_class_install_property (gobject_class, PROP_CONTROL,
206       g_param_spec_string ("control", "Control",
207           "The control string for this stream", DEFAULT_CONTROL,
208           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209
210   g_object_class_install_property (gobject_class, PROP_PROFILES,
211       g_param_spec_flags ("profiles", "Profiles",
212           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
213           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214
215   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
216       g_param_spec_flags ("protocols", "Protocols",
217           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
218           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
219
220   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
221       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
222       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
223       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
224
225   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
226       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
227       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
228       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
229
230   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
231
232   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
233 }
234
235 static void
236 gst_rtsp_stream_init (GstRTSPStream * stream)
237 {
238   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
239
240   GST_DEBUG ("new stream %p", stream);
241
242   stream->priv = priv;
243
244   priv->dscp_qos = -1;
245   priv->control = g_strdup (DEFAULT_CONTROL);
246   priv->profiles = DEFAULT_PROFILES;
247   priv->protocols = DEFAULT_PROTOCOLS;
248
249   g_mutex_init (&priv->lock);
250
251   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
252       NULL, (GDestroyNotify) gst_caps_unref);
253 }
254
255 static void
256 gst_rtsp_stream_finalize (GObject * obj)
257 {
258   GstRTSPStream *stream;
259   GstRTSPStreamPrivate *priv;
260
261   stream = GST_RTSP_STREAM (obj);
262   priv = stream->priv;
263
264   GST_DEBUG ("finalize stream %p", stream);
265
266   /* we really need to be unjoined now */
267   g_return_if_fail (!priv->is_joined);
268
269   if (priv->addr_v4)
270     gst_rtsp_address_free (priv->addr_v4);
271   if (priv->addr_v6)
272     gst_rtsp_address_free (priv->addr_v6);
273   if (priv->server_addr_v4)
274     gst_rtsp_address_free (priv->server_addr_v4);
275   if (priv->server_addr_v6)
276     gst_rtsp_address_free (priv->server_addr_v6);
277   if (priv->pool)
278     g_object_unref (priv->pool);
279   if (priv->rtxsend)
280     g_object_unref (priv->rtxsend);
281
282   gst_object_unref (priv->payloader);
283   gst_object_unref (priv->srcpad);
284   g_free (priv->control);
285   g_mutex_clear (&priv->lock);
286
287   g_hash_table_unref (priv->keys);
288
289   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
290 }
291
292 static void
293 gst_rtsp_stream_get_property (GObject * object, guint propid,
294     GValue * value, GParamSpec * pspec)
295 {
296   GstRTSPStream *stream = GST_RTSP_STREAM (object);
297
298   switch (propid) {
299     case PROP_CONTROL:
300       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
301       break;
302     case PROP_PROFILES:
303       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
304       break;
305     case PROP_PROTOCOLS:
306       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
307       break;
308     default:
309       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
310   }
311 }
312
313 static void
314 gst_rtsp_stream_set_property (GObject * object, guint propid,
315     const GValue * value, GParamSpec * pspec)
316 {
317   GstRTSPStream *stream = GST_RTSP_STREAM (object);
318
319   switch (propid) {
320     case PROP_CONTROL:
321       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
322       break;
323     case PROP_PROFILES:
324       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
325       break;
326     case PROP_PROTOCOLS:
327       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
328       break;
329     default:
330       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
331   }
332 }
333
334 /**
335  * gst_rtsp_stream_new:
336  * @idx: an index
337  * @srcpad: a #GstPad
338  * @payloader: a #GstElement
339  *
340  * Create a new media stream with index @idx that handles RTP data on
341  * @srcpad and has a payloader element @payloader.
342  *
343  * Returns: (transfer full): a new #GstRTSPStream
344  */
345 GstRTSPStream *
346 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
347 {
348   GstRTSPStreamPrivate *priv;
349   GstRTSPStream *stream;
350
351   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
352   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
353   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
354
355   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
356   priv = stream->priv;
357   priv->idx = idx;
358   priv->payloader = gst_object_ref (payloader);
359   priv->srcpad = gst_object_ref (srcpad);
360
361   return stream;
362 }
363
364 /**
365  * gst_rtsp_stream_get_index:
366  * @stream: a #GstRTSPStream
367  *
368  * Get the stream index.
369  *
370  * Return: the stream index.
371  */
372 guint
373 gst_rtsp_stream_get_index (GstRTSPStream * stream)
374 {
375   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
376
377   return stream->priv->idx;
378 }
379
380 /**
381  * gst_rtsp_stream_get_pt:
382  * @stream: a #GstRTSPStream
383  *
384  * Get the stream payload type.
385  *
386  * Return: the stream payload type.
387  */
388 guint
389 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
390 {
391   GstRTSPStreamPrivate *priv;
392   guint pt;
393
394   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
395
396   priv = stream->priv;
397
398   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
399
400   return pt;
401 }
402
403 /**
404  * gst_rtsp_stream_get_srcpad:
405  * @stream: a #GstRTSPStream
406  *
407  * Get the srcpad associated with @stream.
408  *
409  * Returns: (transfer full): the srcpad. Unref after usage.
410  */
411 GstPad *
412 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
413 {
414   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
415
416   return gst_object_ref (stream->priv->srcpad);
417 }
418
419 /**
420  * gst_rtsp_stream_get_control:
421  * @stream: a #GstRTSPStream
422  *
423  * Get the control string to identify this stream.
424  *
425  * Returns: (transfer full): the control string. g_free() after usage.
426  */
427 gchar *
428 gst_rtsp_stream_get_control (GstRTSPStream * stream)
429 {
430   GstRTSPStreamPrivate *priv;
431   gchar *result;
432
433   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
434
435   priv = stream->priv;
436
437   g_mutex_lock (&priv->lock);
438   if ((result = g_strdup (priv->control)) == NULL)
439     result = g_strdup_printf ("stream=%u", priv->idx);
440   g_mutex_unlock (&priv->lock);
441
442   return result;
443 }
444
445 /**
446  * gst_rtsp_stream_set_control:
447  * @stream: a #GstRTSPStream
448  * @control: a control string
449  *
450  * Set the control string in @stream.
451  */
452 void
453 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
454 {
455   GstRTSPStreamPrivate *priv;
456
457   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
458
459   priv = stream->priv;
460
461   g_mutex_lock (&priv->lock);
462   g_free (priv->control);
463   priv->control = g_strdup (control);
464   g_mutex_unlock (&priv->lock);
465 }
466
467 /**
468  * gst_rtsp_stream_has_control:
469  * @stream: a #GstRTSPStream
470  * @control: a control string
471  *
472  * Check if @stream has the control string @control.
473  *
474  * Returns: %TRUE is @stream has @control as the control string
475  */
476 gboolean
477 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
478 {
479   GstRTSPStreamPrivate *priv;
480   gboolean res;
481
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
483
484   priv = stream->priv;
485
486   g_mutex_lock (&priv->lock);
487   if (priv->control)
488     res = (g_strcmp0 (priv->control, control) == 0);
489   else {
490     guint streamid;
491
492     if (sscanf (control, "stream=%u", &streamid) > 0)
493       res = (streamid == priv->idx);
494     else
495       res = FALSE;
496   }
497   g_mutex_unlock (&priv->lock);
498
499   return res;
500 }
501
502 /**
503  * gst_rtsp_stream_set_mtu:
504  * @stream: a #GstRTSPStream
505  * @mtu: a new MTU
506  *
507  * Configure the mtu in the payloader of @stream to @mtu.
508  */
509 void
510 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
511 {
512   GstRTSPStreamPrivate *priv;
513
514   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
515
516   priv = stream->priv;
517
518   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
519
520   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
521 }
522
523 /**
524  * gst_rtsp_stream_get_mtu:
525  * @stream: a #GstRTSPStream
526  *
527  * Get the configured MTU in the payloader of @stream.
528  *
529  * Returns: the MTU of the payloader.
530  */
531 guint
532 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
533 {
534   GstRTSPStreamPrivate *priv;
535   guint mtu;
536
537   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
538
539   priv = stream->priv;
540
541   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
542
543   return mtu;
544 }
545
546 /* Update the dscp qos property on the udp sinks */
547 static void
548 update_dscp_qos (GstRTSPStream * stream)
549 {
550   GstRTSPStreamPrivate *priv;
551
552   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
553
554   priv = stream->priv;
555
556   if (priv->udpsink[0]) {
557     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
558         NULL);
559   }
560
561   if (priv->udpsink[1]) {
562     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
563         NULL);
564   }
565 }
566
567 /**
568  * gst_rtsp_stream_set_dscp_qos:
569  * @stream: a #GstRTSPStream
570  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
571  *
572  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
573  */
574 void
575 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
576 {
577   GstRTSPStreamPrivate *priv;
578
579   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
580
581   priv = stream->priv;
582
583   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
584
585   if (dscp_qos < -1 || dscp_qos > 63) {
586     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
587     return;
588   }
589
590   priv->dscp_qos = dscp_qos;
591
592   update_dscp_qos (stream);
593 }
594
595 /**
596  * gst_rtsp_stream_get_dscp_qos:
597  * @stream: a #GstRTSPStream
598  *
599  * Get the configured DSCP QoS in of the outgoing sockets.
600  *
601  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
602  */
603 gint
604 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
605 {
606   GstRTSPStreamPrivate *priv;
607
608   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
609
610   priv = stream->priv;
611
612   return priv->dscp_qos;
613 }
614
615 /**
616  * gst_rtsp_stream_is_transport_supported:
617  * @stream: a #GstRTSPStream
618  * @transport: (transfer none): a #GstRTSPTransport
619  *
620  * Check if @transport can be handled by stream
621  *
622  * Returns: %TRUE if @transport can be handled by @stream.
623  */
624 gboolean
625 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
626     GstRTSPTransport * transport)
627 {
628   GstRTSPStreamPrivate *priv;
629
630   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
631
632   priv = stream->priv;
633
634   g_mutex_lock (&priv->lock);
635   if (transport->trans != GST_RTSP_TRANS_RTP)
636     goto unsupported_transmode;
637
638   if (!(transport->profile & priv->profiles))
639     goto unsupported_profile;
640
641   if (!(transport->lower_transport & priv->protocols))
642     goto unsupported_ltrans;
643
644   g_mutex_unlock (&priv->lock);
645
646   return TRUE;
647
648   /* ERRORS */
649 unsupported_transmode:
650   {
651     GST_DEBUG ("unsupported transport mode %d", transport->trans);
652     g_mutex_unlock (&priv->lock);
653     return FALSE;
654   }
655 unsupported_profile:
656   {
657     GST_DEBUG ("unsupported profile %d", transport->profile);
658     g_mutex_unlock (&priv->lock);
659     return FALSE;
660   }
661 unsupported_ltrans:
662   {
663     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
664     g_mutex_unlock (&priv->lock);
665     return FALSE;
666   }
667 }
668
669 /**
670  * gst_rtsp_stream_set_profiles:
671  * @stream: a #GstRTSPStream
672  * @profiles: the new profiles
673  *
674  * Configure the allowed profiles for @stream.
675  */
676 void
677 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
678 {
679   GstRTSPStreamPrivate *priv;
680
681   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
682
683   priv = stream->priv;
684
685   g_mutex_lock (&priv->lock);
686   priv->profiles = profiles;
687   g_mutex_unlock (&priv->lock);
688 }
689
690 /**
691  * gst_rtsp_stream_get_profiles:
692  * @stream: a #GstRTSPStream
693  *
694  * Get the allowed profiles of @stream.
695  *
696  * Returns: a #GstRTSPProfile
697  */
698 GstRTSPProfile
699 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
700 {
701   GstRTSPStreamPrivate *priv;
702   GstRTSPProfile res;
703
704   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
705
706   priv = stream->priv;
707
708   g_mutex_lock (&priv->lock);
709   res = priv->profiles;
710   g_mutex_unlock (&priv->lock);
711
712   return res;
713 }
714
715 /**
716  * gst_rtsp_stream_set_protocols:
717  * @stream: a #GstRTSPStream
718  * @protocols: the new flags
719  *
720  * Configure the allowed lower transport for @stream.
721  */
722 void
723 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
724     GstRTSPLowerTrans protocols)
725 {
726   GstRTSPStreamPrivate *priv;
727
728   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
729
730   priv = stream->priv;
731
732   g_mutex_lock (&priv->lock);
733   priv->protocols = protocols;
734   g_mutex_unlock (&priv->lock);
735 }
736
737 /**
738  * gst_rtsp_stream_get_protocols:
739  * @stream: a #GstRTSPStream
740  *
741  * Get the allowed protocols of @stream.
742  *
743  * Returns: a #GstRTSPLowerTrans
744  */
745 GstRTSPLowerTrans
746 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
747 {
748   GstRTSPStreamPrivate *priv;
749   GstRTSPLowerTrans res;
750
751   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
752       GST_RTSP_LOWER_TRANS_UNKNOWN);
753
754   priv = stream->priv;
755
756   g_mutex_lock (&priv->lock);
757   res = priv->protocols;
758   g_mutex_unlock (&priv->lock);
759
760   return res;
761 }
762
763 /**
764  * gst_rtsp_stream_set_address_pool:
765  * @stream: a #GstRTSPStream
766  * @pool: (transfer none): a #GstRTSPAddressPool
767  *
768  * configure @pool to be used as the address pool of @stream.
769  */
770 void
771 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
772     GstRTSPAddressPool * pool)
773 {
774   GstRTSPStreamPrivate *priv;
775   GstRTSPAddressPool *old;
776
777   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
778
779   priv = stream->priv;
780
781   GST_LOG_OBJECT (stream, "set address pool %p", pool);
782
783   g_mutex_lock (&priv->lock);
784   if ((old = priv->pool) != pool)
785     priv->pool = pool ? g_object_ref (pool) : NULL;
786   else
787     old = NULL;
788   g_mutex_unlock (&priv->lock);
789
790   if (old)
791     g_object_unref (old);
792 }
793
794 /**
795  * gst_rtsp_stream_get_address_pool:
796  * @stream: a #GstRTSPStream
797  *
798  * Get the #GstRTSPAddressPool used as the address pool of @stream.
799  *
800  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
801  * usage.
802  */
803 GstRTSPAddressPool *
804 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
805 {
806   GstRTSPStreamPrivate *priv;
807   GstRTSPAddressPool *result;
808
809   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
810
811   priv = stream->priv;
812
813   g_mutex_lock (&priv->lock);
814   if ((result = priv->pool))
815     g_object_ref (result);
816   g_mutex_unlock (&priv->lock);
817
818   return result;
819 }
820
821 /**
822  * gst_rtsp_stream_get_multicast_address:
823  * @stream: a #GstRTSPStream
824  * @family: the #GSocketFamily
825  *
826  * Get the multicast address of @stream for @family.
827  *
828  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
829  * or %NULL when no address could be allocated. gst_rtsp_address_free()
830  * after usage.
831  */
832 GstRTSPAddress *
833 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
834     GSocketFamily family)
835 {
836   GstRTSPStreamPrivate *priv;
837   GstRTSPAddress *result;
838   GstRTSPAddress **addrp;
839   GstRTSPAddressFlags flags;
840
841   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
842
843   priv = stream->priv;
844
845   if (family == G_SOCKET_FAMILY_IPV6) {
846     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
847     addrp = &priv->addr_v6;
848   } else {
849     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
850     addrp = &priv->addr_v4;
851   }
852
853   g_mutex_lock (&priv->lock);
854   if (*addrp == NULL) {
855     if (priv->pool == NULL)
856       goto no_pool;
857
858     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
859
860     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
861     if (*addrp == NULL)
862       goto no_address;
863   }
864   result = gst_rtsp_address_copy (*addrp);
865   g_mutex_unlock (&priv->lock);
866
867   return result;
868
869   /* ERRORS */
870 no_pool:
871   {
872     GST_ERROR_OBJECT (stream, "no address pool specified");
873     g_mutex_unlock (&priv->lock);
874     return NULL;
875   }
876 no_address:
877   {
878     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
879     g_mutex_unlock (&priv->lock);
880     return NULL;
881   }
882 }
883
884 /**
885  * gst_rtsp_stream_reserve_address:
886  * @stream: a #GstRTSPStream
887  * @address: an address
888  * @port: a port
889  * @n_ports: n_ports
890  * @ttl: a TTL
891  *
892  * Reserve @address and @port as the address and port of @stream.
893  *
894  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
895  * the address could be reserved. gst_rtsp_address_free() after usage.
896  */
897 GstRTSPAddress *
898 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
899     const gchar * address, guint port, guint n_ports, guint ttl)
900 {
901   GstRTSPStreamPrivate *priv;
902   GstRTSPAddress *result;
903   GInetAddress *addr;
904   GSocketFamily family;
905   GstRTSPAddress **addrp;
906
907   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
908   g_return_val_if_fail (address != NULL, NULL);
909   g_return_val_if_fail (port > 0, NULL);
910   g_return_val_if_fail (n_ports > 0, NULL);
911   g_return_val_if_fail (ttl > 0, NULL);
912
913   priv = stream->priv;
914
915   addr = g_inet_address_new_from_string (address);
916   if (!addr) {
917     GST_ERROR ("failed to get inet addr from %s", address);
918     family = G_SOCKET_FAMILY_IPV4;
919   } else {
920     family = g_inet_address_get_family (addr);
921     g_object_unref (addr);
922   }
923
924   if (family == G_SOCKET_FAMILY_IPV6)
925     addrp = &priv->addr_v6;
926   else
927     addrp = &priv->addr_v4;
928
929   g_mutex_lock (&priv->lock);
930   if (*addrp == NULL) {
931     GstRTSPAddressPoolResult res;
932
933     if (priv->pool == NULL)
934       goto no_pool;
935
936     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
937         port, n_ports, ttl, addrp);
938     if (res != GST_RTSP_ADDRESS_POOL_OK)
939       goto no_address;
940   } else {
941     if (strcmp ((*addrp)->address, address) ||
942         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
943         (*addrp)->ttl != ttl)
944       goto different_address;
945   }
946   result = gst_rtsp_address_copy (*addrp);
947   g_mutex_unlock (&priv->lock);
948
949   return result;
950
951   /* ERRORS */
952 no_pool:
953   {
954     GST_ERROR_OBJECT (stream, "no address pool specified");
955     g_mutex_unlock (&priv->lock);
956     return NULL;
957   }
958 no_address:
959   {
960     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
961         address);
962     g_mutex_unlock (&priv->lock);
963     return NULL;
964   }
965 different_address:
966   {
967     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
968         " reserved", address);
969     g_mutex_unlock (&priv->lock);
970     return NULL;
971   }
972 }
973
974 static gboolean
975 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
976     GSocketFamily family, GstElement * udpsrc_out[2],
977     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
978     GstRTSPAddress ** server_addr_out)
979 {
980   GstStateChangeReturn ret;
981   GstElement *udpsrc0, *udpsrc1;
982   GstElement *udpsink0, *udpsink1;
983   GSocket *rtp_socket = NULL;
984   GSocket *rtcp_socket;
985   gint tmp_rtp, tmp_rtcp;
986   guint count;
987   gint rtpport, rtcpport;
988   GList *rejected_addresses = NULL;
989   GstRTSPAddress *addr = NULL;
990   GInetAddress *inetaddr = NULL;
991   GSocketAddress *rtp_sockaddr = NULL;
992   GSocketAddress *rtcp_sockaddr = NULL;
993   const gchar *multisink_socket;
994
995   if (family == G_SOCKET_FAMILY_IPV6)
996     multisink_socket = "socket-v6";
997   else
998     multisink_socket = "socket";
999
1000   udpsrc0 = NULL;
1001   udpsrc1 = NULL;
1002   udpsink0 = NULL;
1003   udpsink1 = NULL;
1004   count = 0;
1005
1006   /* Start with random port */
1007   tmp_rtp = 0;
1008
1009   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1010       G_SOCKET_PROTOCOL_UDP, NULL);
1011   if (!rtcp_socket)
1012     goto no_udp_protocol;
1013
1014   if (*server_addr_out)
1015     gst_rtsp_address_free (*server_addr_out);
1016
1017   /* try to allocate 2 UDP ports, the RTP port should be an even
1018    * number and the RTCP port should be the next (uneven) port */
1019 again:
1020
1021   if (rtp_socket == NULL) {
1022     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1023         G_SOCKET_PROTOCOL_UDP, NULL);
1024     if (!rtp_socket)
1025       goto no_udp_protocol;
1026   }
1027
1028   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1029     GstRTSPAddressFlags flags;
1030
1031     if (addr)
1032       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1033
1034     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1035     if (family == G_SOCKET_FAMILY_IPV6)
1036       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1037     else
1038       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1039
1040     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1041
1042     if (addr == NULL)
1043       goto no_ports;
1044
1045     tmp_rtp = addr->port;
1046
1047     g_clear_object (&inetaddr);
1048     inetaddr = g_inet_address_new_from_string (addr->address);
1049   } else {
1050     if (tmp_rtp != 0) {
1051       tmp_rtp += 2;
1052       if (++count > 20)
1053         goto no_ports;
1054     }
1055
1056     if (inetaddr == NULL)
1057       inetaddr = g_inet_address_new_any (family);
1058   }
1059
1060   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1061   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1062     g_object_unref (rtp_sockaddr);
1063     goto again;
1064   }
1065   g_object_unref (rtp_sockaddr);
1066
1067   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1068   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1069     g_clear_object (&rtp_sockaddr);
1070     goto socket_error;
1071   }
1072
1073   tmp_rtp =
1074       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1075   g_object_unref (rtp_sockaddr);
1076
1077   /* check if port is even */
1078   if ((tmp_rtp & 1) != 0) {
1079     /* port not even, close and allocate another */
1080     tmp_rtp++;
1081     g_clear_object (&rtp_socket);
1082     goto again;
1083   }
1084
1085   /* set port */
1086   tmp_rtcp = tmp_rtp + 1;
1087
1088   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1089   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1090     g_object_unref (rtcp_sockaddr);
1091     g_clear_object (&rtp_socket);
1092     goto again;
1093   }
1094   g_object_unref (rtcp_sockaddr);
1095
1096   g_clear_object (&inetaddr);
1097
1098   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1099   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1100
1101   if (udpsrc0 == NULL || udpsrc1 == NULL)
1102     goto no_udp_protocol;
1103
1104   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1105   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1106
1107   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1108   if (ret == GST_STATE_CHANGE_FAILURE)
1109     goto element_error;
1110   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1111   if (ret == GST_STATE_CHANGE_FAILURE)
1112     goto element_error;
1113
1114   /* all fine, do port check */
1115   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1116   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1117
1118   /* this should not happen... */
1119   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1120     goto port_error;
1121
1122   if (udpsink_out[0])
1123     udpsink0 = udpsink_out[0];
1124   else
1125     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1126
1127   if (!udpsink0)
1128     goto no_udp_protocol;
1129
1130   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1131   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1132
1133   if (udpsink_out[1])
1134     udpsink1 = udpsink_out[1];
1135   else
1136     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1137
1138   if (!udpsink1)
1139     goto no_udp_protocol;
1140
1141   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1142   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1143   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1144
1145   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1146   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1147   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1148   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1149   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1150   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1151   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1152   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1153
1154   /* we keep these elements, we will further configure them when the
1155    * client told us to really use the UDP ports. */
1156   udpsrc_out[0] = udpsrc0;
1157   udpsrc_out[1] = udpsrc1;
1158   udpsink_out[0] = udpsink0;
1159   udpsink_out[1] = udpsink1;
1160
1161   server_port_out->min = rtpport;
1162   server_port_out->max = rtcpport;
1163
1164   *server_addr_out = addr;
1165   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1166
1167   g_object_unref (rtp_socket);
1168   g_object_unref (rtcp_socket);
1169
1170   return TRUE;
1171
1172   /* ERRORS */
1173 no_udp_protocol:
1174   {
1175     goto cleanup;
1176   }
1177 no_ports:
1178   {
1179     goto cleanup;
1180   }
1181 port_error:
1182   {
1183     goto cleanup;
1184   }
1185 socket_error:
1186   {
1187     goto cleanup;
1188   }
1189 element_error:
1190   {
1191     goto cleanup;
1192   }
1193 cleanup:
1194   {
1195     if (udpsrc0) {
1196       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1197       gst_object_unref (udpsrc0);
1198     }
1199     if (udpsrc1) {
1200       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1201       gst_object_unref (udpsrc1);
1202     }
1203     if (udpsink0) {
1204       gst_element_set_state (udpsink0, GST_STATE_NULL);
1205       gst_object_unref (udpsink0);
1206     }
1207     if (inetaddr)
1208       g_object_unref (inetaddr);
1209     g_list_free_full (rejected_addresses,
1210         (GDestroyNotify) gst_rtsp_address_free);
1211     if (addr)
1212       gst_rtsp_address_free (addr);
1213     if (rtp_socket)
1214       g_object_unref (rtp_socket);
1215     if (rtcp_socket)
1216       g_object_unref (rtcp_socket);
1217     return FALSE;
1218   }
1219 }
1220
1221 /* must be called with lock */
1222 static gboolean
1223 alloc_ports (GstRTSPStream * stream)
1224 {
1225   GstRTSPStreamPrivate *priv = stream->priv;
1226
1227   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1228       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1229       &priv->server_port_v4, &priv->server_addr_v4);
1230
1231   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1232       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1233       &priv->server_port_v6, &priv->server_addr_v6);
1234
1235   return priv->have_ipv4 || priv->have_ipv6;
1236 }
1237
1238 /**
1239  * gst_rtsp_stream_get_server_port:
1240  * @stream: a #GstRTSPStream
1241  * @server_port: (out): result server port
1242  * @family: the port family to get
1243  *
1244  * Fill @server_port with the port pair used by the server. This function can
1245  * only be called when @stream has been joined.
1246  */
1247 void
1248 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1249     GstRTSPRange * server_port, GSocketFamily family)
1250 {
1251   GstRTSPStreamPrivate *priv;
1252
1253   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1254   priv = stream->priv;
1255   g_return_if_fail (priv->is_joined);
1256
1257   g_mutex_lock (&priv->lock);
1258   if (family == G_SOCKET_FAMILY_IPV4) {
1259     if (server_port)
1260       *server_port = priv->server_port_v4;
1261   } else {
1262     if (server_port)
1263       *server_port = priv->server_port_v6;
1264   }
1265   g_mutex_unlock (&priv->lock);
1266 }
1267
1268 /**
1269  * gst_rtsp_stream_get_rtpsession:
1270  * @stream: a #GstRTSPStream
1271  *
1272  * Get the RTP session of this stream.
1273  *
1274  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1275  */
1276 GObject *
1277 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1278 {
1279   GstRTSPStreamPrivate *priv;
1280   GObject *session;
1281
1282   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1283
1284   priv = stream->priv;
1285
1286   g_mutex_lock (&priv->lock);
1287   if ((session = priv->session))
1288     g_object_ref (session);
1289   g_mutex_unlock (&priv->lock);
1290
1291   return session;
1292 }
1293
1294 /**
1295  * gst_rtsp_stream_get_ssrc:
1296  * @stream: a #GstRTSPStream
1297  * @ssrc: (out): result ssrc
1298  *
1299  * Get the SSRC used by the RTP session of this stream. This function can only
1300  * be called when @stream has been joined.
1301  */
1302 void
1303 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1304 {
1305   GstRTSPStreamPrivate *priv;
1306
1307   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1308   priv = stream->priv;
1309   g_return_if_fail (priv->is_joined);
1310
1311   g_mutex_lock (&priv->lock);
1312   if (ssrc && priv->session)
1313     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1314   g_mutex_unlock (&priv->lock);
1315 }
1316
1317 /**
1318  * gst_rtsp_stream_set_retransmission_time:
1319  * @stream: a #GstRTSPStream
1320  * @time: a #GstClockTime
1321  *
1322  * Set the amount of time to store retransmission packets.
1323  */
1324 void
1325 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1326     GstClockTime time)
1327 {
1328   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1329
1330   g_mutex_lock (&stream->priv->lock);
1331   stream->priv->rtx_time = time;
1332   if (stream->priv->rtxsend)
1333     g_object_set (stream->priv->rtxsend, "max-size-time",
1334         GST_TIME_AS_MSECONDS (time), NULL);
1335   g_mutex_unlock (&stream->priv->lock);
1336 }
1337
1338 /**
1339  * gst_rtsp_media_get_retransmission_time:
1340  * @media: a #GstRTSPMedia
1341  *
1342  * Get the amount of time to store retransmission data.
1343  *
1344  * Returns: the amount of time to store retransmission data.
1345  */
1346 GstClockTime
1347 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1348 {
1349   GstClockTime ret;
1350
1351   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1352
1353   g_mutex_lock (&stream->priv->lock);
1354   ret = stream->priv->rtx_time;
1355   g_mutex_unlock (&stream->priv->lock);
1356
1357   return ret;
1358 }
1359
1360 void
1361 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1362 {
1363   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1364
1365   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1366
1367   g_mutex_lock (&stream->priv->lock);
1368   stream->priv->rtx_pt = rtx_pt;
1369   if (stream->priv->rtxsend) {
1370     guint pt = gst_rtsp_stream_get_pt (stream);
1371     gchar *pt_s = g_strdup_printf ("%d", pt);
1372     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1373         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1374     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1375   }
1376   g_mutex_unlock (&stream->priv->lock);
1377 }
1378
1379 guint
1380 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1381 {
1382   guint rtx_pt;
1383
1384   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1385
1386   g_mutex_lock (&stream->priv->lock);
1387   rtx_pt = stream->priv->rtx_pt;
1388   g_mutex_unlock (&stream->priv->lock);
1389
1390   return rtx_pt;
1391 }
1392
1393 /* executed from streaming thread */
1394 static void
1395 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1396 {
1397   GstRTSPStreamPrivate *priv = stream->priv;
1398   GstCaps *newcaps, *oldcaps;
1399
1400   newcaps = gst_pad_get_current_caps (pad);
1401
1402   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1403       newcaps);
1404
1405   g_mutex_lock (&priv->lock);
1406   oldcaps = priv->caps;
1407   priv->caps = newcaps;
1408   g_mutex_unlock (&priv->lock);
1409
1410   if (oldcaps)
1411     gst_caps_unref (oldcaps);
1412 }
1413
1414 static void
1415 dump_structure (const GstStructure * s)
1416 {
1417   gchar *sstr;
1418
1419   sstr = gst_structure_to_string (s);
1420   GST_INFO ("structure: %s", sstr);
1421   g_free (sstr);
1422 }
1423
1424 static GstRTSPStreamTransport *
1425 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1426 {
1427   GstRTSPStreamPrivate *priv = stream->priv;
1428   GList *walk;
1429   GstRTSPStreamTransport *result = NULL;
1430   const gchar *tmp;
1431   gchar *dest;
1432   guint port;
1433
1434   if (rtcp_from == NULL)
1435     return NULL;
1436
1437   tmp = g_strrstr (rtcp_from, ":");
1438   if (tmp == NULL)
1439     return NULL;
1440
1441   port = atoi (tmp + 1);
1442   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1443
1444   g_mutex_lock (&priv->lock);
1445   GST_INFO ("finding %s:%d in %d transports", dest, port,
1446       g_list_length (priv->transports));
1447
1448   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1449     GstRTSPStreamTransport *trans = walk->data;
1450     const GstRTSPTransport *tr;
1451     gint min, max;
1452
1453     tr = gst_rtsp_stream_transport_get_transport (trans);
1454
1455     min = tr->client_port.min;
1456     max = tr->client_port.max;
1457
1458     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1459       result = trans;
1460       break;
1461     }
1462   }
1463   if (result)
1464     g_object_ref (result);
1465   g_mutex_unlock (&priv->lock);
1466
1467   g_free (dest);
1468
1469   return result;
1470 }
1471
1472 static GstRTSPStreamTransport *
1473 check_transport (GObject * source, GstRTSPStream * stream)
1474 {
1475   GstStructure *stats;
1476   GstRTSPStreamTransport *trans;
1477
1478   /* see if we have a stream to match with the origin of the RTCP packet */
1479   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1480   if (trans == NULL) {
1481     g_object_get (source, "stats", &stats, NULL);
1482     if (stats) {
1483       const gchar *rtcp_from;
1484
1485       dump_structure (stats);
1486
1487       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1488       if ((trans = find_transport (stream, rtcp_from))) {
1489         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1490             source);
1491         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1492             g_object_unref);
1493       }
1494       gst_structure_free (stats);
1495     }
1496   }
1497   return trans;
1498 }
1499
1500
1501 static void
1502 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1503 {
1504   GstRTSPStreamTransport *trans;
1505
1506   GST_INFO ("%p: new source %p", stream, source);
1507
1508   trans = check_transport (source, stream);
1509
1510   if (trans)
1511     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1512 }
1513
1514 static void
1515 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1516 {
1517   GST_INFO ("%p: new SDES %p", stream, source);
1518 }
1519
1520 static void
1521 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1522 {
1523   GstRTSPStreamTransport *trans;
1524
1525   trans = check_transport (source, stream);
1526
1527   if (trans) {
1528     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1529     gst_rtsp_stream_transport_keep_alive (trans);
1530   }
1531 #ifdef DUMP_STATS
1532   {
1533     GstStructure *stats;
1534     g_object_get (source, "stats", &stats, NULL);
1535     if (stats) {
1536       dump_structure (stats);
1537       gst_structure_free (stats);
1538     }
1539   }
1540 #endif
1541 }
1542
1543 static void
1544 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1545 {
1546   GST_INFO ("%p: source %p bye", stream, source);
1547 }
1548
1549 static void
1550 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1551 {
1552   GstRTSPStreamTransport *trans;
1553
1554   GST_INFO ("%p: source %p bye timeout", stream, source);
1555
1556   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1557     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1558     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1559   }
1560 }
1561
1562 static void
1563 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1564 {
1565   GstRTSPStreamTransport *trans;
1566
1567   GST_INFO ("%p: source %p timeout", stream, source);
1568
1569   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1570     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1571     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1572   }
1573 }
1574
1575 static void
1576 clear_tr_cache (GstRTSPStreamPrivate * priv)
1577 {
1578   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1579   g_list_free (priv->tr_cache);
1580   priv->tr_cache = NULL;
1581 }
1582
1583 static GstFlowReturn
1584 handle_new_sample (GstAppSink * sink, gpointer user_data)
1585 {
1586   GstRTSPStreamPrivate *priv;
1587   GList *walk;
1588   GstSample *sample;
1589   GstBuffer *buffer;
1590   GstRTSPStream *stream;
1591   gboolean is_rtp;
1592
1593   sample = gst_app_sink_pull_sample (sink);
1594   if (!sample)
1595     return GST_FLOW_OK;
1596
1597   stream = (GstRTSPStream *) user_data;
1598   priv = stream->priv;
1599   buffer = gst_sample_get_buffer (sample);
1600
1601   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1602
1603   g_mutex_lock (&priv->lock);
1604   if (priv->tr_cache_cookie != priv->transports_cookie) {
1605     clear_tr_cache (priv);
1606     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1607       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1608       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1609     }
1610     priv->tr_cache_cookie = priv->transports_cookie;
1611   }
1612   g_mutex_unlock (&priv->lock);
1613
1614   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1615     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1616
1617     if (is_rtp) {
1618       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1619     } else {
1620       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1621     }
1622   }
1623   gst_sample_unref (sample);
1624
1625   return GST_FLOW_OK;
1626 }
1627
1628 static GstAppSinkCallbacks sink_cb = {
1629   NULL,                         /* not interested in EOS */
1630   NULL,                         /* not interested in preroll samples */
1631   handle_new_sample,
1632 };
1633
1634 static GstElement *
1635 get_rtp_encoder (GstRTSPStream * stream, guint session)
1636 {
1637   GstRTSPStreamPrivate *priv = stream->priv;
1638
1639   if (priv->srtpenc == NULL) {
1640     gchar *name;
1641
1642     name = g_strdup_printf ("srtpenc_%u", session);
1643     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1644     g_free (name);
1645
1646     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1647   }
1648   return gst_object_ref (priv->srtpenc);
1649 }
1650
1651 static GstElement *
1652 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1653 {
1654   GstRTSPStreamPrivate *priv = stream->priv;
1655   GstElement *oldenc, *enc;
1656   GstPad *pad;
1657   gchar *name;
1658
1659   if (priv->idx != session)
1660     return NULL;
1661
1662   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1663
1664   oldenc = priv->srtpenc;
1665   enc = get_rtp_encoder (stream, session);
1666   name = g_strdup_printf ("rtp_sink_%d", session);
1667   pad = gst_element_get_request_pad (enc, name);
1668   g_free (name);
1669   gst_object_unref (pad);
1670
1671   if (oldenc == NULL)
1672     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1673         enc);
1674
1675   return enc;
1676 }
1677
1678 static GstElement *
1679 request_rtcp_encoder (GstElement * rtpbin, guint session,
1680     GstRTSPStream * stream)
1681 {
1682   GstRTSPStreamPrivate *priv = stream->priv;
1683   GstElement *oldenc, *enc;
1684   GstPad *pad;
1685   gchar *name;
1686
1687   if (priv->idx != session)
1688     return NULL;
1689
1690   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1691
1692   oldenc = priv->srtpenc;
1693   enc = get_rtp_encoder (stream, session);
1694   name = g_strdup_printf ("rtcp_sink_%d", session);
1695   pad = gst_element_get_request_pad (enc, name);
1696   g_free (name);
1697   gst_object_unref (pad);
1698
1699   if (oldenc == NULL)
1700     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1701         enc);
1702
1703   return enc;
1704 }
1705
1706 static GstCaps *
1707 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1708 {
1709   GstRTSPStreamPrivate *priv = stream->priv;
1710   GstCaps *caps;
1711
1712   GST_DEBUG ("request key %08x", ssrc);
1713
1714   g_mutex_lock (&priv->lock);
1715   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1716     gst_caps_ref (caps);
1717   g_mutex_unlock (&priv->lock);
1718
1719   return caps;
1720 }
1721
1722 static GstElement *
1723 request_rtcp_decoder (GstElement * rtpbin, guint session,
1724     GstRTSPStream * stream)
1725 {
1726   GstRTSPStreamPrivate *priv = stream->priv;
1727
1728   if (priv->idx != session)
1729     return NULL;
1730
1731   if (priv->srtpdec == NULL) {
1732     gchar *name;
1733
1734     name = g_strdup_printf ("srtpdec_%u", session);
1735     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1736     g_free (name);
1737
1738     g_signal_connect (priv->srtpdec, "request-key",
1739         (GCallback) request_key, stream);
1740   }
1741   return gst_object_ref (priv->srtpdec);
1742 }
1743
1744 static GstElement *
1745 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
1746 {
1747   GstElement *bin;
1748   GstPad *pad;
1749   GstStructure *pt_map;
1750   gchar *name;
1751   guint pt, rtx_pt;
1752   gchar *pt_s;
1753
1754   pt = gst_rtsp_stream_get_pt (stream);
1755   pt_s = g_strdup_printf ("%u", pt);
1756   rtx_pt = stream->priv->rtx_pt;
1757
1758   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1759
1760   bin = gst_bin_new (NULL);
1761   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1762   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1763       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1764   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1765       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1766   gst_structure_free (pt_map);
1767   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1768
1769   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1770   name = g_strdup_printf ("src_%u", sessid);
1771   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1772   g_free (name);
1773   gst_object_unref (pad);
1774
1775   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1776   name = g_strdup_printf ("sink_%u", sessid);
1777   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1778   g_free (name);
1779   gst_object_unref (pad);
1780
1781   return bin;
1782 }
1783
1784 /**
1785  * gst_rtsp_stream_join_bin:
1786  * @stream: a #GstRTSPStream
1787  * @bin: (transfer none): a #GstBin to join
1788  * @rtpbin: (transfer none): a rtpbin element in @bin
1789  * @state: the target state of the new elements
1790  *
1791  * Join the #GstBin @bin that contains the element @rtpbin.
1792  *
1793  * @stream will link to @rtpbin, which must be inside @bin. The elements
1794  * added to @bin will be set to the state given in @state.
1795  *
1796  * Returns: %TRUE on success.
1797  */
1798 gboolean
1799 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1800     GstElement * rtpbin, GstState state)
1801 {
1802   GstRTSPStreamPrivate *priv;
1803   gint i;
1804   guint idx;
1805   gchar *name;
1806   GstPad *pad, *sinkpad, *selpad;
1807   GstPadLinkReturn ret;
1808
1809   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1810   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1811   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1812
1813   priv = stream->priv;
1814
1815   g_mutex_lock (&priv->lock);
1816   if (priv->is_joined)
1817     goto was_joined;
1818
1819   /* create a session with the same index as the stream */
1820   idx = priv->idx;
1821
1822   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1823
1824   if (!alloc_ports (stream))
1825     goto no_ports;
1826
1827   /* update the dscp qos field in the sinks */
1828   update_dscp_qos (stream);
1829
1830   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1831       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1832     /* For SRTP */
1833     g_signal_connect (rtpbin, "request-rtp-encoder",
1834         (GCallback) request_rtp_encoder, stream);
1835     g_signal_connect (rtpbin, "request-rtcp-encoder",
1836         (GCallback) request_rtcp_encoder, stream);
1837     g_signal_connect (rtpbin, "request-rtcp-decoder",
1838         (GCallback) request_rtcp_decoder, stream);
1839   }
1840
1841   if (priv->rtx_time > 0) {
1842     /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
1843     g_signal_connect (rtpbin, "request-aux-sender",
1844         (GCallback) request_aux_sender, stream);
1845   }
1846
1847   /* get a pad for sending RTP */
1848   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1849   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1850   g_free (name);
1851   /* link the RTP pad to the session manager, it should not really fail unless
1852    * this is not really an RTP pad */
1853   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1854   if (ret != GST_PAD_LINK_OK)
1855     goto link_failed;
1856
1857   /* get pads from the RTP session element for sending and receiving
1858    * RTP/RTCP*/
1859   name = g_strdup_printf ("send_rtp_src_%u", idx);
1860   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1861   g_free (name);
1862   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1863   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1864   g_free (name);
1865   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1866   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1867   g_free (name);
1868   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1869   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1870   g_free (name);
1871
1872   /* get the session */
1873   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1874
1875   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1876       stream);
1877   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1878       stream);
1879   g_signal_connect (priv->session, "on-ssrc-active",
1880       (GCallback) on_ssrc_active, stream);
1881   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1882       stream);
1883   g_signal_connect (priv->session, "on-bye-timeout",
1884       (GCallback) on_bye_timeout, stream);
1885   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1886       stream);
1887
1888   for (i = 0; i < 2; i++) {
1889     GstPad *teepad, *queuepad;
1890     /* For the sender we create this bit of pipeline for both
1891      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1892      * we need to add a queue before appsink to make the pipeline
1893      * not block. For the TCP case, we want to pump data to the
1894      * client as fast as possible anyway.
1895      *
1896      * .--------.      .-----.    .---------.
1897      * | rtpbin |      | tee |    | udpsink |
1898      * |       send->sink   src->sink       |
1899      * '--------'      |     |    '---------'
1900      *                 |     |    .---------.    .---------.
1901      *                 |     |    |  queue  |    | appsink |
1902      *                 |    src->sink      src->sink       |
1903      *                 '-----'    '---------'    '---------'
1904      *
1905      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1906      * udpsink directly to the session.
1907      */
1908     /* add udpsink */
1909     gst_bin_add (bin, priv->udpsink[i]);
1910     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1911
1912     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1913       /* make tee for RTP/RTCP */
1914       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1915       gst_bin_add (bin, priv->tee[i]);
1916
1917       /* and link to rtpbin send pad */
1918       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1919       gst_pad_link (priv->send_src[i], pad);
1920       gst_object_unref (pad);
1921
1922       /* link tee to udpsink */
1923       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1924       gst_pad_link (teepad, sinkpad);
1925       gst_object_unref (teepad);
1926
1927       /* make queue */
1928       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1929       gst_bin_add (bin, priv->appqueue[i]);
1930       /* and link to tee */
1931       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1932       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1933       gst_pad_link (teepad, pad);
1934       gst_object_unref (pad);
1935       gst_object_unref (teepad);
1936
1937       /* make appsink */
1938       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1939       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1940       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1941       gst_bin_add (bin, priv->appsink[i]);
1942       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1943           &sink_cb, stream, NULL);
1944       /* and link to queue */
1945       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1946       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1947       gst_pad_link (queuepad, pad);
1948       gst_object_unref (pad);
1949       gst_object_unref (queuepad);
1950     } else {
1951       /* else only udpsink needed, link it to the session */
1952       gst_pad_link (priv->send_src[i], sinkpad);
1953     }
1954     gst_object_unref (sinkpad);
1955
1956     /* For the receiver we create this bit of pipeline for both
1957      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1958      * and it is all funneled into the rtpbin receive pad.
1959      *
1960      * .--------.     .--------.    .--------.
1961      * | udpsrc |     | funnel |    | rtpbin |
1962      * |       src->sink      src->sink      |
1963      * '--------'     |        |    '--------'
1964      * .--------.     |        |
1965      * | appsrc |     |        |
1966      * |       src->sink       |
1967      * '--------'     '--------'
1968      */
1969     /* make funnel for the RTP/RTCP receivers */
1970     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1971     gst_bin_add (bin, priv->funnel[i]);
1972
1973     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1974     gst_pad_link (pad, priv->recv_sink[i]);
1975     gst_object_unref (pad);
1976
1977     if (priv->udpsrc_v4[i]) {
1978       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1979        * values */
1980       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1981       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1982       /* add udpsrc */
1983       gst_bin_add (bin, priv->udpsrc_v4[i]);
1984
1985       /* and link to the funnel v4 */
1986       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1987       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1988       gst_pad_link (pad, selpad);
1989       gst_object_unref (pad);
1990       gst_object_unref (selpad);
1991     }
1992
1993     if (priv->udpsrc_v6[i]) {
1994       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1995       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1996       gst_bin_add (bin, priv->udpsrc_v6[i]);
1997
1998       /* and link to the funnel v6 */
1999       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2000       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2001       gst_pad_link (pad, selpad);
2002       gst_object_unref (pad);
2003       gst_object_unref (selpad);
2004     }
2005
2006     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2007       /* make and add appsrc */
2008       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2009       gst_bin_add (bin, priv->appsrc[i]);
2010       /* and link to the funnel */
2011       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2012       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2013       gst_pad_link (pad, selpad);
2014       gst_object_unref (pad);
2015       gst_object_unref (selpad);
2016     }
2017
2018     /* check if we need to set to a special state */
2019     if (state != GST_STATE_NULL) {
2020       if (priv->udpsink[i])
2021         gst_element_set_state (priv->udpsink[i], state);
2022       if (priv->appsink[i])
2023         gst_element_set_state (priv->appsink[i], state);
2024       if (priv->appqueue[i])
2025         gst_element_set_state (priv->appqueue[i], state);
2026       if (priv->tee[i])
2027         gst_element_set_state (priv->tee[i], state);
2028       if (priv->funnel[i])
2029         gst_element_set_state (priv->funnel[i], state);
2030       if (priv->appsrc[i])
2031         gst_element_set_state (priv->appsrc[i], state);
2032     }
2033   }
2034
2035   /* be notified of caps changes */
2036   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2037       (GCallback) caps_notify, stream);
2038
2039   priv->is_joined = TRUE;
2040   g_mutex_unlock (&priv->lock);
2041
2042   return TRUE;
2043
2044   /* ERRORS */
2045 was_joined:
2046   {
2047     g_mutex_unlock (&priv->lock);
2048     return TRUE;
2049   }
2050 no_ports:
2051   {
2052     g_mutex_unlock (&priv->lock);
2053     GST_WARNING ("failed to allocate ports %u", idx);
2054     return FALSE;
2055   }
2056 link_failed:
2057   {
2058     GST_WARNING ("failed to link stream %u", idx);
2059     gst_object_unref (priv->send_rtp_sink);
2060     priv->send_rtp_sink = NULL;
2061     g_mutex_unlock (&priv->lock);
2062     return FALSE;
2063   }
2064 }
2065
2066 /**
2067  * gst_rtsp_stream_leave_bin:
2068  * @stream: a #GstRTSPStream
2069  * @bin: (transfer none): a #GstBin
2070  * @rtpbin: (transfer none): a rtpbin #GstElement
2071  *
2072  * Remove the elements of @stream from @bin.
2073  *
2074  * Return: %TRUE on success.
2075  */
2076 gboolean
2077 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2078     GstElement * rtpbin)
2079 {
2080   GstRTSPStreamPrivate *priv;
2081   gint i;
2082   GList *l;
2083
2084   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2085   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2086   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2087
2088   priv = stream->priv;
2089
2090   g_mutex_lock (&priv->lock);
2091   if (!priv->is_joined)
2092     goto was_not_joined;
2093
2094   /* all transports must be removed by now */
2095   if (priv->transports != NULL)
2096     goto transports_not_removed;
2097
2098   clear_tr_cache (priv);
2099
2100   GST_INFO ("stream %p leaving bin", stream);
2101
2102   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2103   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2104   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2105   gst_object_unref (priv->send_rtp_sink);
2106   priv->send_rtp_sink = NULL;
2107
2108   for (i = 0; i < 2; i++) {
2109     if (priv->udpsink[i])
2110       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2111     if (priv->appsink[i])
2112       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2113     if (priv->appqueue[i])
2114       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2115     if (priv->tee[i])
2116       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2117     if (priv->funnel[i])
2118       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2119     if (priv->appsrc[i])
2120       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2121     if (priv->udpsrc_v4[i]) {
2122       /* and set udpsrc to NULL now before removing */
2123       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2124       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2125       /* removing them should also nicely release the request
2126        * pads when they finalize */
2127       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2128     }
2129     if (priv->udpsrc_v6[i]) {
2130       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2131       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2132       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2133     }
2134
2135     for (l = priv->transport_sources; l; l = l->next) {
2136       GstRTSPMulticastTransportSource *s = l->data;
2137
2138       if (!s->udpsrc[i])
2139         continue;
2140
2141       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2142       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2143       gst_bin_remove (bin, s->udpsrc[i]);
2144     }
2145
2146     if (priv->udpsink[i])
2147       gst_bin_remove (bin, priv->udpsink[i]);
2148     if (priv->appsrc[i])
2149       gst_bin_remove (bin, priv->appsrc[i]);
2150     if (priv->appsink[i])
2151       gst_bin_remove (bin, priv->appsink[i]);
2152     if (priv->appqueue[i])
2153       gst_bin_remove (bin, priv->appqueue[i]);
2154     if (priv->tee[i])
2155       gst_bin_remove (bin, priv->tee[i]);
2156     if (priv->funnel[i])
2157       gst_bin_remove (bin, priv->funnel[i]);
2158
2159     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2160     gst_object_unref (priv->recv_sink[i]);
2161     priv->recv_sink[i] = NULL;
2162
2163     priv->udpsrc_v4[i] = NULL;
2164     priv->udpsrc_v6[i] = NULL;
2165     priv->udpsink[i] = NULL;
2166     priv->appsrc[i] = NULL;
2167     priv->appsink[i] = NULL;
2168     priv->appqueue[i] = NULL;
2169     priv->tee[i] = NULL;
2170     priv->funnel[i] = NULL;
2171   }
2172
2173   for (l = priv->transport_sources; l; l = l->next) {
2174     GstRTSPMulticastTransportSource *s = l->data;
2175     g_slice_free (GstRTSPMulticastTransportSource, s);
2176   }
2177   g_list_free (priv->transport_sources);
2178   priv->transport_sources = NULL;
2179
2180   gst_object_unref (priv->send_src[0]);
2181   priv->send_src[0] = NULL;
2182
2183   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2184   gst_object_unref (priv->send_src[1]);
2185   priv->send_src[1] = NULL;
2186
2187   g_object_unref (priv->session);
2188   priv->session = NULL;
2189   if (priv->caps)
2190     gst_caps_unref (priv->caps);
2191   priv->caps = NULL;
2192
2193   if (priv->srtpenc)
2194     gst_object_unref (priv->srtpenc);
2195   if (priv->srtpdec)
2196     gst_object_unref (priv->srtpdec);
2197
2198   priv->is_joined = FALSE;
2199   g_mutex_unlock (&priv->lock);
2200
2201   return TRUE;
2202
2203 was_not_joined:
2204   {
2205     g_mutex_unlock (&priv->lock);
2206     return TRUE;
2207   }
2208 transports_not_removed:
2209   {
2210     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2211     g_mutex_unlock (&priv->lock);
2212     return FALSE;
2213   }
2214 }
2215
2216 /**
2217  * gst_rtsp_stream_get_rtpinfo:
2218  * @stream: a #GstRTSPStream
2219  * @rtptime: (allow-none): result RTP timestamp
2220  * @seq: (allow-none): result RTP seqnum
2221  * @clock_rate: (allow-none): the clock rate
2222  * @running_time: (allow-none): result running-time
2223  *
2224  * Retrieve the current rtptime, seq and running-time. This is used to
2225  * construct a RTPInfo reply header.
2226  *
2227  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2228  */
2229 gboolean
2230 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2231     guint * rtptime, guint * seq, guint * clock_rate,
2232     GstClockTime * running_time)
2233 {
2234   GstRTSPStreamPrivate *priv;
2235   GstStructure *stats;
2236   GObjectClass *payobjclass;
2237
2238   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2239
2240   priv = stream->priv;
2241
2242   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2243
2244   g_mutex_lock (&priv->lock);
2245
2246   if (g_object_class_find_property (payobjclass, "stats")) {
2247     g_object_get (priv->payloader, "stats", &stats, NULL);
2248     if (stats == NULL)
2249       goto no_stats;
2250
2251     if (seq)
2252       gst_structure_get_uint (stats, "seqnum", seq);
2253
2254     if (rtptime)
2255       gst_structure_get_uint (stats, "timestamp", rtptime);
2256
2257     if (running_time)
2258       gst_structure_get_clock_time (stats, "running-time", running_time);
2259
2260     if (clock_rate) {
2261       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2262       if (*clock_rate == 0 && running_time)
2263         *running_time = GST_CLOCK_TIME_NONE;
2264     }
2265     gst_structure_free (stats);
2266   } else {
2267     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2268         !g_object_class_find_property (payobjclass, "timestamp"))
2269       goto no_stats;
2270
2271     if (seq)
2272       g_object_get (priv->payloader, "seqnum", seq, NULL);
2273
2274     if (rtptime)
2275       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2276
2277     if (running_time)
2278       *running_time = GST_CLOCK_TIME_NONE;
2279   }
2280   g_mutex_unlock (&priv->lock);
2281
2282   return TRUE;
2283
2284   /* ERRORS */
2285 no_stats:
2286   {
2287     GST_WARNING ("Could not get payloader stats");
2288     g_mutex_unlock (&priv->lock);
2289     return FALSE;
2290   }
2291 }
2292
2293 /**
2294  * gst_rtsp_stream_get_caps:
2295  * @stream: a #GstRTSPStream
2296  *
2297  * Retrieve the current caps of @stream.
2298  *
2299  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2300  * after usage.
2301  */
2302 GstCaps *
2303 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2304 {
2305   GstRTSPStreamPrivate *priv;
2306   GstCaps *result;
2307
2308   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2309
2310   priv = stream->priv;
2311
2312   g_mutex_lock (&priv->lock);
2313   if ((result = priv->caps))
2314     gst_caps_ref (result);
2315   g_mutex_unlock (&priv->lock);
2316
2317   return result;
2318 }
2319
2320 /**
2321  * gst_rtsp_stream_recv_rtp:
2322  * @stream: a #GstRTSPStream
2323  * @buffer: (transfer full): a #GstBuffer
2324  *
2325  * Handle an RTP buffer for the stream. This method is usually called when a
2326  * message has been received from a client using the TCP transport.
2327  *
2328  * This function takes ownership of @buffer.
2329  *
2330  * Returns: a GstFlowReturn.
2331  */
2332 GstFlowReturn
2333 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2334 {
2335   GstRTSPStreamPrivate *priv;
2336   GstFlowReturn ret;
2337   GstElement *element;
2338
2339   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2340   priv = stream->priv;
2341   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2342   g_return_val_if_fail (priv->is_joined, FALSE);
2343
2344   g_mutex_lock (&priv->lock);
2345   if (priv->appsrc[0])
2346     element = gst_object_ref (priv->appsrc[0]);
2347   else
2348     element = NULL;
2349   g_mutex_unlock (&priv->lock);
2350
2351   if (element) {
2352     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2353     gst_object_unref (element);
2354   } else {
2355     ret = GST_FLOW_OK;
2356   }
2357   return ret;
2358 }
2359
2360 /**
2361  * gst_rtsp_stream_recv_rtcp:
2362  * @stream: a #GstRTSPStream
2363  * @buffer: (transfer full): a #GstBuffer
2364  *
2365  * Handle an RTCP buffer for the stream. This method is usually called when a
2366  * message has been received from a client using the TCP transport.
2367  *
2368  * This function takes ownership of @buffer.
2369  *
2370  * Returns: a GstFlowReturn.
2371  */
2372 GstFlowReturn
2373 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2374 {
2375   GstRTSPStreamPrivate *priv;
2376   GstFlowReturn ret;
2377   GstElement *element;
2378
2379   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2380   priv = stream->priv;
2381   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2382
2383   if (!priv->is_joined) {
2384     gst_buffer_unref (buffer);
2385     return GST_FLOW_NOT_LINKED;
2386   }
2387   g_mutex_lock (&priv->lock);
2388   if (priv->appsrc[1])
2389     element = gst_object_ref (priv->appsrc[1]);
2390   else
2391     element = NULL;
2392   g_mutex_unlock (&priv->lock);
2393
2394   if (element) {
2395     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2396     gst_object_unref (element);
2397   } else {
2398     ret = GST_FLOW_OK;
2399     gst_buffer_unref (buffer);
2400   }
2401   return ret;
2402 }
2403
2404 /* must be called with lock */
2405 static gboolean
2406 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2407     gboolean add)
2408 {
2409   GstRTSPStreamPrivate *priv = stream->priv;
2410   const GstRTSPTransport *tr;
2411
2412   tr = gst_rtsp_stream_transport_get_transport (trans);
2413
2414   switch (tr->lower_transport) {
2415     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2416     {
2417       GstRTSPMulticastTransportSource *source;
2418       GstBin *bin;
2419
2420       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2421
2422       if (add) {
2423         gchar *host;
2424         gint i;
2425         GstPad *selpad, *pad;
2426
2427         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2428         source->transport = trans;
2429
2430         for (i = 0; i < 2; i++) {
2431           host =
2432               g_strdup_printf ("udp://%s:%d", tr->destination,
2433               (i == 0) ? tr->port.min : tr->port.max);
2434           source->udpsrc[i] =
2435               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2436           g_free (host);
2437
2438           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2439            * values */
2440           gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2441           gst_element_set_locked_state (source->udpsrc[i], TRUE);
2442           /* add udpsrc */
2443           gst_bin_add (bin, source->udpsrc[i]);
2444
2445           /* and link to the funnel v4 */
2446           source->selpad[i] = selpad =
2447               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2448           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2449           gst_pad_link (pad, selpad);
2450           gst_object_unref (pad);
2451           gst_object_unref (selpad);
2452         }
2453         gst_object_unref (bin);
2454
2455         priv->transport_sources =
2456             g_list_prepend (priv->transport_sources, source);
2457       } else {
2458         GList *l;
2459
2460         for (l = priv->transport_sources; l; l = l->next) {
2461           source = l->data;
2462
2463           if (source->transport == trans) {
2464             priv->transport_sources =
2465                 g_list_delete_link (priv->transport_sources, l);
2466             break;
2467           }
2468         }
2469
2470         if (l != NULL) {
2471           gint i;
2472
2473           for (i = 0; i < 2; i++) {
2474             /* Will automatically unlink everything */
2475             gst_bin_remove (bin,
2476                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2477
2478             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2479             gst_object_unref (source->udpsrc[i]);
2480
2481             gst_element_release_request_pad (priv->funnel[i],
2482                 source->selpad[i]);
2483           }
2484
2485           g_slice_free (GstRTSPMulticastTransportSource, source);
2486         }
2487       }
2488
2489       /* fall through for the generic case */
2490     }
2491     case GST_RTSP_LOWER_TRANS_UDP:
2492     {
2493       gchar *dest;
2494       gint min, max;
2495       guint ttl = 0;
2496
2497       dest = tr->destination;
2498       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2499         min = tr->port.min;
2500         max = tr->port.max;
2501         ttl = tr->ttl;
2502       } else {
2503         min = tr->client_port.min;
2504         max = tr->client_port.max;
2505       }
2506
2507       if (add) {
2508         if (ttl > 0) {
2509           GST_INFO ("setting ttl-mc %d", ttl);
2510           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2511           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2512         }
2513         GST_INFO ("adding %s:%d-%d", dest, min, max);
2514         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2515         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2516         priv->transports = g_list_prepend (priv->transports, trans);
2517       } else {
2518         GST_INFO ("removing %s:%d-%d", dest, min, max);
2519         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2520         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2521         priv->transports = g_list_remove (priv->transports, trans);
2522       }
2523       priv->transports_cookie++;
2524       break;
2525     }
2526     case GST_RTSP_LOWER_TRANS_TCP:
2527       if (add) {
2528         GST_INFO ("adding TCP %s", tr->destination);
2529         priv->transports = g_list_prepend (priv->transports, trans);
2530       } else {
2531         GST_INFO ("removing TCP %s", tr->destination);
2532         priv->transports = g_list_remove (priv->transports, trans);
2533       }
2534       priv->transports_cookie++;
2535       break;
2536     default:
2537       goto unknown_transport;
2538   }
2539   return TRUE;
2540
2541   /* ERRORS */
2542 unknown_transport:
2543   {
2544     GST_INFO ("Unknown transport %d", tr->lower_transport);
2545     return FALSE;
2546   }
2547 }
2548
2549
2550 /**
2551  * gst_rtsp_stream_add_transport:
2552  * @stream: a #GstRTSPStream
2553  * @trans: (transfer none): a #GstRTSPStreamTransport
2554  *
2555  * Add the transport in @trans to @stream. The media of @stream will
2556  * then also be send to the values configured in @trans.
2557  *
2558  * @stream must be joined to a bin.
2559  *
2560  * @trans must contain a valid #GstRTSPTransport.
2561  *
2562  * Returns: %TRUE if @trans was added
2563  */
2564 gboolean
2565 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2566     GstRTSPStreamTransport * trans)
2567 {
2568   GstRTSPStreamPrivate *priv;
2569   gboolean res;
2570
2571   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2572   priv = stream->priv;
2573   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2574   g_return_val_if_fail (priv->is_joined, FALSE);
2575
2576   g_mutex_lock (&priv->lock);
2577   res = update_transport (stream, trans, TRUE);
2578   g_mutex_unlock (&priv->lock);
2579
2580   return res;
2581 }
2582
2583 /**
2584  * gst_rtsp_stream_remove_transport:
2585  * @stream: a #GstRTSPStream
2586  * @trans: (transfer none): a #GstRTSPStreamTransport
2587  *
2588  * Remove the transport in @trans from @stream. The media of @stream will
2589  * not be sent to the values configured in @trans.
2590  *
2591  * @stream must be joined to a bin.
2592  *
2593  * @trans must contain a valid #GstRTSPTransport.
2594  *
2595  * Returns: %TRUE if @trans was removed
2596  */
2597 gboolean
2598 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2599     GstRTSPStreamTransport * trans)
2600 {
2601   GstRTSPStreamPrivate *priv;
2602   gboolean res;
2603
2604   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2605   priv = stream->priv;
2606   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2607   g_return_val_if_fail (priv->is_joined, FALSE);
2608
2609   g_mutex_lock (&priv->lock);
2610   res = update_transport (stream, trans, FALSE);
2611   g_mutex_unlock (&priv->lock);
2612
2613   return res;
2614 }
2615
2616 /**
2617  * gst_rtsp_stream_update_crypto:
2618  * @stream: a #GstRTSPStream
2619  * @ssrc: the SSRC
2620  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2621  *
2622  * Update the new crypto information for @ssrc in @stream. If information
2623  * for @ssrc did not exist, it will be added. If information
2624  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2625  * be removed from @stream.
2626  *
2627  * Returns: %TRUE if @crypto could be updated
2628  */
2629 gboolean
2630 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2631     guint ssrc, GstCaps * crypto)
2632 {
2633   GstRTSPStreamPrivate *priv;
2634
2635   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2636   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2637
2638   priv = stream->priv;
2639
2640   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2641
2642   g_mutex_lock (&priv->lock);
2643   if (crypto)
2644     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2645         gst_caps_ref (crypto));
2646   else
2647     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2648   g_mutex_unlock (&priv->lock);
2649
2650   return TRUE;
2651 }
2652
2653 /**
2654  * gst_rtsp_stream_get_rtp_socket:
2655  * @stream: a #GstRTSPStream
2656  * @family: the socket family
2657  *
2658  * Get the RTP socket from @stream for a @family.
2659  *
2660  * @stream must be joined to a bin.
2661  *
2662  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2663  * socket could be allocated for @family. Unref after usage
2664  */
2665 GSocket *
2666 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2667 {
2668   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2669   GSocket *socket;
2670   const gchar *name;
2671
2672   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2673   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2674       family == G_SOCKET_FAMILY_IPV6, NULL);
2675   g_return_val_if_fail (priv->udpsink[0], NULL);
2676
2677   if (family == G_SOCKET_FAMILY_IPV6)
2678     name = "socket-v6";
2679   else
2680     name = "socket";
2681
2682   g_object_get (priv->udpsink[0], name, &socket, NULL);
2683
2684   return socket;
2685 }
2686
2687 /**
2688  * gst_rtsp_stream_get_rtcp_socket:
2689  * @stream: a #GstRTSPStream
2690  * @family: the socket family
2691  *
2692  * Get the RTCP socket from @stream for a @family.
2693  *
2694  * @stream must be joined to a bin.
2695  *
2696  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2697  * socket could be allocated for @family. Unref after usage
2698  */
2699 GSocket *
2700 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2701 {
2702   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2703   GSocket *socket;
2704   const gchar *name;
2705
2706   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2707   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2708       family == G_SOCKET_FAMILY_IPV6, NULL);
2709   g_return_val_if_fail (priv->udpsink[1], NULL);
2710
2711   if (family == G_SOCKET_FAMILY_IPV6)
2712     name = "socket-v6";
2713   else
2714     name = "socket";
2715
2716   g_object_get (priv->udpsink[1], name, &socket, NULL);
2717
2718   return socket;
2719 }
2720
2721 /**
2722  * gst_rtsp_stream_set_seqnum:
2723  * @stream: a #GstRTSPStream
2724  * @seqnum: a new sequence number
2725  *
2726  * Configure the sequence number in the payloader of @stream to @seqnum.
2727  */
2728 void
2729 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
2730 {
2731   GstRTSPStreamPrivate *priv;
2732
2733   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2734
2735   priv = stream->priv;
2736
2737   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
2738 }
2739
2740 /**
2741  * gst_rtsp_stream_get_seqnum:
2742  * @stream: a #GstRTSPStream
2743  *
2744  * Get the configured sequence number in the payloader of @stream.
2745  *
2746  * Returns: the sequence number of the payloader.
2747  */
2748 guint16
2749 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
2750 {
2751   GstRTSPStreamPrivate *priv;
2752   guint seqnum;
2753
2754   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2755
2756   priv = stream->priv;
2757
2758   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
2759
2760   return seqnum;
2761 }
2762
2763 /**
2764  * gst_rtsp_stream_transport_filter:
2765  * @stream: a #GstRTSPStream
2766  * @func: (scope call) (allow-none): a callback
2767  * @user_data: (closure): user data passed to @func
2768  *
2769  * Call @func for each transport managed by @stream. The result value of @func
2770  * determines what happens to the transport. @func will be called with @stream
2771  * locked so no further actions on @stream can be performed from @func.
2772  *
2773  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2774  * @stream.
2775  *
2776  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2777  *
2778  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2779  * will also be added with an additional ref to the result #GList of this
2780  * function..
2781  *
2782  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2783  *
2784  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2785  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2786  * element in the #GList should be unreffed before the list is freed.
2787  */
2788 GList *
2789 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2790     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2791 {
2792   GstRTSPStreamPrivate *priv;
2793   GList *result, *walk, *next;
2794   GHashTable *visited;
2795   guint cookie;
2796
2797   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2798
2799   priv = stream->priv;
2800
2801   result = NULL;
2802   if (func)
2803     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
2804
2805   g_mutex_lock (&priv->lock);
2806 restart:
2807   cookie = priv->transports_cookie;
2808   for (walk = priv->transports; walk; walk = next) {
2809     GstRTSPStreamTransport *trans = walk->data;
2810     GstRTSPFilterResult res;
2811     gboolean changed;
2812
2813     next = g_list_next (walk);
2814
2815     if (func) {
2816       /* only visit each transport once */
2817       if (g_hash_table_contains (visited, trans))
2818         continue;
2819
2820       g_hash_table_add (visited, g_object_ref (trans));
2821       g_mutex_unlock (&priv->lock);
2822
2823       res = func (stream, trans, user_data);
2824
2825       g_mutex_lock (&priv->lock);
2826     } else
2827       res = GST_RTSP_FILTER_REF;
2828
2829     changed = (cookie != priv->transports_cookie);
2830
2831     switch (res) {
2832       case GST_RTSP_FILTER_REMOVE:
2833         update_transport (stream, trans, FALSE);
2834         break;
2835       case GST_RTSP_FILTER_REF:
2836         result = g_list_prepend (result, g_object_ref (trans));
2837         break;
2838       case GST_RTSP_FILTER_KEEP:
2839       default:
2840         break;
2841     }
2842     if (changed)
2843       goto restart;
2844   }
2845   g_mutex_unlock (&priv->lock);
2846
2847   if (func)
2848     g_hash_table_unref (visited);
2849
2850   return result;
2851 }
2852
2853 static GstPadProbeReturn
2854 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2855 {
2856   GstRTSPStreamPrivate *priv;
2857   GstRTSPStream *stream;
2858
2859   stream = user_data;
2860   priv = stream->priv;
2861
2862   GST_DEBUG_OBJECT (pad, "now blocking");
2863
2864   g_mutex_lock (&priv->lock);
2865   priv->blocking = TRUE;
2866   g_mutex_unlock (&priv->lock);
2867
2868   gst_element_post_message (priv->payloader,
2869       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2870           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2871
2872   return GST_PAD_PROBE_OK;
2873 }
2874
2875 /**
2876  * gst_rtsp_stream_set_blocked:
2877  * @stream: a #GstRTSPStream
2878  * @blocked: boolean indicating we should block or unblock
2879  *
2880  * Blocks or unblocks the dataflow on @stream.
2881  *
2882  * Returns: %TRUE on success
2883  */
2884 gboolean
2885 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2886 {
2887   GstRTSPStreamPrivate *priv;
2888
2889   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2890
2891   priv = stream->priv;
2892
2893   g_mutex_lock (&priv->lock);
2894   if (blocked) {
2895     priv->blocking = FALSE;
2896     if (priv->blocked_id == 0) {
2897       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2898           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2899           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2900           g_object_ref (stream), g_object_unref);
2901     }
2902   } else {
2903     if (priv->blocked_id != 0) {
2904       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2905       priv->blocked_id = 0;
2906       priv->blocking = FALSE;
2907     }
2908   }
2909   g_mutex_unlock (&priv->lock);
2910
2911   return TRUE;
2912 }
2913
2914 /**
2915  * gst_rtsp_stream_is_blocking:
2916  * @stream: a #GstRTSPStream
2917  *
2918  * Check if @stream is blocking on a #GstBuffer.
2919  *
2920  * Returns: %TRUE if @stream is blocking
2921  */
2922 gboolean
2923 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2924 {
2925   GstRTSPStreamPrivate *priv;
2926   gboolean result;
2927
2928   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2929
2930   priv = stream->priv;
2931
2932   g_mutex_lock (&priv->lock);
2933   result = priv->blocking;
2934   g_mutex_unlock (&priv->lock);
2935
2936   return result;
2937 }
2938
2939 /**
2940  * gst_rtsp_stream_query_position:
2941  * @stream: a #GstRTSPStream
2942  *
2943  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
2944  * the RTP parts of the pipeline and not the RTCP parts.
2945  *
2946  * Returns: %TRUE if the position could be queried
2947  */
2948 gboolean
2949 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
2950 {
2951   GstRTSPStreamPrivate *priv;
2952   GstElement *sink;
2953   gboolean ret;
2954
2955   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2956
2957   priv = stream->priv;
2958
2959   g_mutex_lock (&priv->lock);
2960   if ((sink = priv->udpsink[0]))
2961     gst_object_ref (sink);
2962   g_mutex_unlock (&priv->lock);
2963
2964   if (!sink)
2965     return FALSE;
2966
2967   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
2968   gst_object_unref (sink);
2969
2970   return ret;
2971 }
2972
2973 /**
2974  * gst_rtsp_stream_query_stop:
2975  * @stream: a #GstRTSPStream
2976  *
2977  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
2978  * the RTP parts of the pipeline and not the RTCP parts.
2979  *
2980  * Returns: %TRUE if the stop could be queried
2981  */
2982 gboolean
2983 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
2984 {
2985   GstRTSPStreamPrivate *priv;
2986   GstElement *sink;
2987   GstQuery *query;
2988   gboolean ret;
2989
2990   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2991
2992   priv = stream->priv;
2993
2994   g_mutex_lock (&priv->lock);
2995   if ((sink = priv->udpsink[0]))
2996     gst_object_ref (sink);
2997   g_mutex_unlock (&priv->lock);
2998
2999   if (!sink)
3000     return FALSE;
3001
3002   query = gst_query_new_segment (GST_FORMAT_TIME);
3003   if ((ret = gst_element_query (sink, query))) {
3004     GstFormat format;
3005
3006     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3007     if (format != GST_FORMAT_TIME)
3008       *stop = -1;
3009   }
3010   gst_query_unref (query);
3011   gst_object_unref (sink);
3012
3013   return ret;
3014
3015 }