rtsp-media: Make sure that sequence numbers are monotonic after pause
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 typedef struct
62 {
63   GstRTSPStreamTransport *transport;
64
65   /* RTP and RTCP source */
66   GstElement *udpsrc[2];
67   GstPad *selpad[2];
68 } GstRTSPMulticastTransportSource;
69
70 struct _GstRTSPStreamPrivate
71 {
72   GMutex lock;
73   guint idx;
74   GstPad *srcpad;
75   GstElement *payloader;
76   guint buffer_size;
77   gboolean is_joined;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_sink[2];
86   GstPad *send_src[2];
87
88   /* the RTPSession object */
89   GObject *session;
90
91   /* SRTP encoder/decoder */
92   GstElement *srtpenc;
93   GstElement *srtpdec;
94   GHashTable *keys;
95
96   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
97    * sockets */
98   GstElement *udpsrc_v4[2];
99
100   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
101    * sockets */
102   GstElement *udpsrc_v6[2];
103
104   GstElement *udpsink[2];
105
106   /* for TCP transport */
107   GstElement *appsrc[2];
108   GstElement *appqueue[2];
109   GstElement *appsink[2];
110
111   GstElement *tee[2];
112   GstElement *funnel[2];
113
114   /* server ports for sending/receiving over ipv4 */
115   GstRTSPRange server_port_v4;
116   GstRTSPAddress *server_addr_v4;
117   gboolean have_ipv4;
118
119   /* server ports for sending/receiving over ipv6 */
120   GstRTSPRange server_port_v6;
121   GstRTSPAddress *server_addr_v6;
122   gboolean have_ipv6;
123
124   /* multicast addresses */
125   GstRTSPAddressPool *pool;
126   GstRTSPAddress *addr_v4;
127   GstRTSPAddress *addr_v6;
128
129   /* the caps of the stream */
130   gulong caps_sig;
131   GstCaps *caps;
132
133   /* transports we stream to */
134   guint n_active;
135   GList *transports;
136   guint transports_cookie;
137   GList *tr_cache;
138   guint tr_cache_cookie;
139
140   /* UDP sources for UDP multicast transports */
141   GList *transport_sources;
142
143   gint dscp_qos;
144
145   /* stream blocking */
146   gulong blocked_id;
147   gboolean blocking;
148 };
149
150 #define DEFAULT_CONTROL         NULL
151 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
152 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
153                                         GST_RTSP_LOWER_TRANS_TCP
154
155 enum
156 {
157   PROP_0,
158   PROP_CONTROL,
159   PROP_PROFILES,
160   PROP_PROTOCOLS,
161   PROP_LAST
162 };
163
164 enum
165 {
166   SIGNAL_NEW_RTP_ENCODER,
167   SIGNAL_NEW_RTCP_ENCODER,
168   SIGNAL_LAST
169 };
170
171 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
172 #define GST_CAT_DEFAULT rtsp_stream_debug
173
174 static GQuark ssrc_stream_map_key;
175
176 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
177     GValue * value, GParamSpec * pspec);
178 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
179     const GValue * value, GParamSpec * pspec);
180
181 static void gst_rtsp_stream_finalize (GObject * obj);
182
183 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
184
185 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
186
187 static void
188 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
189 {
190   GObjectClass *gobject_class;
191
192   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
193
194   gobject_class = G_OBJECT_CLASS (klass);
195
196   gobject_class->get_property = gst_rtsp_stream_get_property;
197   gobject_class->set_property = gst_rtsp_stream_set_property;
198   gobject_class->finalize = gst_rtsp_stream_finalize;
199
200   g_object_class_install_property (gobject_class, PROP_CONTROL,
201       g_param_spec_string ("control", "Control",
202           "The control string for this stream", DEFAULT_CONTROL,
203           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
204
205   g_object_class_install_property (gobject_class, PROP_PROFILES,
206       g_param_spec_flags ("profiles", "Profiles",
207           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
208           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209
210   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
211       g_param_spec_flags ("protocols", "Protocols",
212           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
213           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214
215   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
216       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
217       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
218       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
219
220   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
221       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
222       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
223       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
224
225   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
226
227   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
228 }
229
230 static void
231 gst_rtsp_stream_init (GstRTSPStream * stream)
232 {
233   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
234
235   GST_DEBUG ("new stream %p", stream);
236
237   stream->priv = priv;
238
239   priv->dscp_qos = -1;
240   priv->control = g_strdup (DEFAULT_CONTROL);
241   priv->profiles = DEFAULT_PROFILES;
242   priv->protocols = DEFAULT_PROTOCOLS;
243
244   g_mutex_init (&priv->lock);
245
246   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
247       NULL, (GDestroyNotify) gst_caps_unref);
248 }
249
250 static void
251 gst_rtsp_stream_finalize (GObject * obj)
252 {
253   GstRTSPStream *stream;
254   GstRTSPStreamPrivate *priv;
255
256   stream = GST_RTSP_STREAM (obj);
257   priv = stream->priv;
258
259   GST_DEBUG ("finalize stream %p", stream);
260
261   /* we really need to be unjoined now */
262   g_return_if_fail (!priv->is_joined);
263
264   if (priv->addr_v4)
265     gst_rtsp_address_free (priv->addr_v4);
266   if (priv->addr_v6)
267     gst_rtsp_address_free (priv->addr_v6);
268   if (priv->server_addr_v4)
269     gst_rtsp_address_free (priv->server_addr_v4);
270   if (priv->server_addr_v6)
271     gst_rtsp_address_free (priv->server_addr_v6);
272   if (priv->pool)
273     g_object_unref (priv->pool);
274   gst_object_unref (priv->payloader);
275   gst_object_unref (priv->srcpad);
276   g_free (priv->control);
277   g_mutex_clear (&priv->lock);
278
279   g_hash_table_unref (priv->keys);
280
281   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
282 }
283
284 static void
285 gst_rtsp_stream_get_property (GObject * object, guint propid,
286     GValue * value, GParamSpec * pspec)
287 {
288   GstRTSPStream *stream = GST_RTSP_STREAM (object);
289
290   switch (propid) {
291     case PROP_CONTROL:
292       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
293       break;
294     case PROP_PROFILES:
295       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
296       break;
297     case PROP_PROTOCOLS:
298       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
299       break;
300     default:
301       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
302   }
303 }
304
305 static void
306 gst_rtsp_stream_set_property (GObject * object, guint propid,
307     const GValue * value, GParamSpec * pspec)
308 {
309   GstRTSPStream *stream = GST_RTSP_STREAM (object);
310
311   switch (propid) {
312     case PROP_CONTROL:
313       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
314       break;
315     case PROP_PROFILES:
316       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
317       break;
318     case PROP_PROTOCOLS:
319       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
320       break;
321     default:
322       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
323   }
324 }
325
326 /**
327  * gst_rtsp_stream_new:
328  * @idx: an index
329  * @srcpad: a #GstPad
330  * @payloader: a #GstElement
331  *
332  * Create a new media stream with index @idx that handles RTP data on
333  * @srcpad and has a payloader element @payloader.
334  *
335  * Returns: (transfer full): a new #GstRTSPStream
336  */
337 GstRTSPStream *
338 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
339 {
340   GstRTSPStreamPrivate *priv;
341   GstRTSPStream *stream;
342
343   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
344   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
345   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
346
347   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
348   priv = stream->priv;
349   priv->idx = idx;
350   priv->payloader = gst_object_ref (payloader);
351   priv->srcpad = gst_object_ref (srcpad);
352
353   return stream;
354 }
355
356 /**
357  * gst_rtsp_stream_get_index:
358  * @stream: a #GstRTSPStream
359  *
360  * Get the stream index.
361  *
362  * Return: the stream index.
363  */
364 guint
365 gst_rtsp_stream_get_index (GstRTSPStream * stream)
366 {
367   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
368
369   return stream->priv->idx;
370 }
371
372 /**
373  * gst_rtsp_stream_get_pt:
374  * @stream: a #GstRTSPStream
375  *
376  * Get the stream payload type.
377  *
378  * Return: the stream payload type.
379  */
380 guint
381 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
382 {
383   GstRTSPStreamPrivate *priv;
384   guint pt;
385
386   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
387
388   priv = stream->priv;
389
390   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
391
392   return pt;
393 }
394
395 /**
396  * gst_rtsp_stream_get_srcpad:
397  * @stream: a #GstRTSPStream
398  *
399  * Get the srcpad associated with @stream.
400  *
401  * Returns: (transfer full): the srcpad. Unref after usage.
402  */
403 GstPad *
404 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
405 {
406   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
407
408   return gst_object_ref (stream->priv->srcpad);
409 }
410
411 /**
412  * gst_rtsp_stream_get_control:
413  * @stream: a #GstRTSPStream
414  *
415  * Get the control string to identify this stream.
416  *
417  * Returns: (transfer full): the control string. g_free() after usage.
418  */
419 gchar *
420 gst_rtsp_stream_get_control (GstRTSPStream * stream)
421 {
422   GstRTSPStreamPrivate *priv;
423   gchar *result;
424
425   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
426
427   priv = stream->priv;
428
429   g_mutex_lock (&priv->lock);
430   if ((result = g_strdup (priv->control)) == NULL)
431     result = g_strdup_printf ("stream=%u", priv->idx);
432   g_mutex_unlock (&priv->lock);
433
434   return result;
435 }
436
437 /**
438  * gst_rtsp_stream_set_control:
439  * @stream: a #GstRTSPStream
440  * @control: a control string
441  *
442  * Set the control string in @stream.
443  */
444 void
445 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
446 {
447   GstRTSPStreamPrivate *priv;
448
449   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
450
451   priv = stream->priv;
452
453   g_mutex_lock (&priv->lock);
454   g_free (priv->control);
455   priv->control = g_strdup (control);
456   g_mutex_unlock (&priv->lock);
457 }
458
459 /**
460  * gst_rtsp_stream_has_control:
461  * @stream: a #GstRTSPStream
462  * @control: a control string
463  *
464  * Check if @stream has the control string @control.
465  *
466  * Returns: %TRUE is @stream has @control as the control string
467  */
468 gboolean
469 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
470 {
471   GstRTSPStreamPrivate *priv;
472   gboolean res;
473
474   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
475
476   priv = stream->priv;
477
478   g_mutex_lock (&priv->lock);
479   if (priv->control)
480     res = (g_strcmp0 (priv->control, control) == 0);
481   else {
482     guint streamid;
483
484     if (sscanf (control, "stream=%u", &streamid) > 0)
485       res = (streamid == priv->idx);
486     else
487       res = FALSE;
488   }
489   g_mutex_unlock (&priv->lock);
490
491   return res;
492 }
493
494 /**
495  * gst_rtsp_stream_set_mtu:
496  * @stream: a #GstRTSPStream
497  * @mtu: a new MTU
498  *
499  * Configure the mtu in the payloader of @stream to @mtu.
500  */
501 void
502 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
503 {
504   GstRTSPStreamPrivate *priv;
505
506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
507
508   priv = stream->priv;
509
510   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
511
512   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
513 }
514
515 /**
516  * gst_rtsp_stream_get_mtu:
517  * @stream: a #GstRTSPStream
518  *
519  * Get the configured MTU in the payloader of @stream.
520  *
521  * Returns: the MTU of the payloader.
522  */
523 guint
524 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
525 {
526   GstRTSPStreamPrivate *priv;
527   guint mtu;
528
529   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
530
531   priv = stream->priv;
532
533   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
534
535   return mtu;
536 }
537
538 /* Update the dscp qos property on the udp sinks */
539 static void
540 update_dscp_qos (GstRTSPStream * stream)
541 {
542   GstRTSPStreamPrivate *priv;
543
544   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
545
546   priv = stream->priv;
547
548   if (priv->udpsink[0]) {
549     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
550         NULL);
551   }
552
553   if (priv->udpsink[1]) {
554     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
555         NULL);
556   }
557 }
558
559 /**
560  * gst_rtsp_stream_set_dscp_qos:
561  * @stream: a #GstRTSPStream
562  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
563  *
564  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
565  */
566 void
567 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
568 {
569   GstRTSPStreamPrivate *priv;
570
571   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
572
573   priv = stream->priv;
574
575   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
576
577   if (dscp_qos < -1 || dscp_qos > 63) {
578     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
579     return;
580   }
581
582   priv->dscp_qos = dscp_qos;
583
584   update_dscp_qos (stream);
585 }
586
587 /**
588  * gst_rtsp_stream_get_dscp_qos:
589  * @stream: a #GstRTSPStream
590  *
591  * Get the configured DSCP QoS in of the outgoing sockets.
592  *
593  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
594  */
595 gint
596 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
597 {
598   GstRTSPStreamPrivate *priv;
599
600   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
601
602   priv = stream->priv;
603
604   return priv->dscp_qos;
605 }
606
607 /**
608  * gst_rtsp_stream_is_transport_supported:
609  * @stream: a #GstRTSPStream
610  * @transport: (transfer none): a #GstRTSPTransport
611  *
612  * Check if @transport can be handled by stream
613  *
614  * Returns: %TRUE if @transport can be handled by @stream.
615  */
616 gboolean
617 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
618     GstRTSPTransport * transport)
619 {
620   GstRTSPStreamPrivate *priv;
621
622   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
623
624   priv = stream->priv;
625
626   g_mutex_lock (&priv->lock);
627   if (transport->trans != GST_RTSP_TRANS_RTP)
628     goto unsupported_transmode;
629
630   if (!(transport->profile & priv->profiles))
631     goto unsupported_profile;
632
633   if (!(transport->lower_transport & priv->protocols))
634     goto unsupported_ltrans;
635
636   g_mutex_unlock (&priv->lock);
637
638   return TRUE;
639
640   /* ERRORS */
641 unsupported_transmode:
642   {
643     GST_DEBUG ("unsupported transport mode %d", transport->trans);
644     g_mutex_unlock (&priv->lock);
645     return FALSE;
646   }
647 unsupported_profile:
648   {
649     GST_DEBUG ("unsupported profile %d", transport->profile);
650     g_mutex_unlock (&priv->lock);
651     return FALSE;
652   }
653 unsupported_ltrans:
654   {
655     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
656     g_mutex_unlock (&priv->lock);
657     return FALSE;
658   }
659 }
660
661 /**
662  * gst_rtsp_stream_set_profiles:
663  * @stream: a #GstRTSPStream
664  * @profiles: the new profiles
665  *
666  * Configure the allowed profiles for @stream.
667  */
668 void
669 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
670 {
671   GstRTSPStreamPrivate *priv;
672
673   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
674
675   priv = stream->priv;
676
677   g_mutex_lock (&priv->lock);
678   priv->profiles = profiles;
679   g_mutex_unlock (&priv->lock);
680 }
681
682 /**
683  * gst_rtsp_stream_get_profiles:
684  * @stream: a #GstRTSPStream
685  *
686  * Get the allowed profiles of @stream.
687  *
688  * Returns: a #GstRTSPProfile
689  */
690 GstRTSPProfile
691 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
692 {
693   GstRTSPStreamPrivate *priv;
694   GstRTSPProfile res;
695
696   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
697
698   priv = stream->priv;
699
700   g_mutex_lock (&priv->lock);
701   res = priv->profiles;
702   g_mutex_unlock (&priv->lock);
703
704   return res;
705 }
706
707 /**
708  * gst_rtsp_stream_set_protocols:
709  * @stream: a #GstRTSPStream
710  * @protocols: the new flags
711  *
712  * Configure the allowed lower transport for @stream.
713  */
714 void
715 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
716     GstRTSPLowerTrans protocols)
717 {
718   GstRTSPStreamPrivate *priv;
719
720   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
721
722   priv = stream->priv;
723
724   g_mutex_lock (&priv->lock);
725   priv->protocols = protocols;
726   g_mutex_unlock (&priv->lock);
727 }
728
729 /**
730  * gst_rtsp_stream_get_protocols:
731  * @stream: a #GstRTSPStream
732  *
733  * Get the allowed protocols of @stream.
734  *
735  * Returns: a #GstRTSPLowerTrans
736  */
737 GstRTSPLowerTrans
738 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
739 {
740   GstRTSPStreamPrivate *priv;
741   GstRTSPLowerTrans res;
742
743   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
744       GST_RTSP_LOWER_TRANS_UNKNOWN);
745
746   priv = stream->priv;
747
748   g_mutex_lock (&priv->lock);
749   res = priv->protocols;
750   g_mutex_unlock (&priv->lock);
751
752   return res;
753 }
754
755 /**
756  * gst_rtsp_stream_set_address_pool:
757  * @stream: a #GstRTSPStream
758  * @pool: (transfer none): a #GstRTSPAddressPool
759  *
760  * configure @pool to be used as the address pool of @stream.
761  */
762 void
763 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
764     GstRTSPAddressPool * pool)
765 {
766   GstRTSPStreamPrivate *priv;
767   GstRTSPAddressPool *old;
768
769   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
770
771   priv = stream->priv;
772
773   GST_LOG_OBJECT (stream, "set address pool %p", pool);
774
775   g_mutex_lock (&priv->lock);
776   if ((old = priv->pool) != pool)
777     priv->pool = pool ? g_object_ref (pool) : NULL;
778   else
779     old = NULL;
780   g_mutex_unlock (&priv->lock);
781
782   if (old)
783     g_object_unref (old);
784 }
785
786 /**
787  * gst_rtsp_stream_get_address_pool:
788  * @stream: a #GstRTSPStream
789  *
790  * Get the #GstRTSPAddressPool used as the address pool of @stream.
791  *
792  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
793  * usage.
794  */
795 GstRTSPAddressPool *
796 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
797 {
798   GstRTSPStreamPrivate *priv;
799   GstRTSPAddressPool *result;
800
801   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
802
803   priv = stream->priv;
804
805   g_mutex_lock (&priv->lock);
806   if ((result = priv->pool))
807     g_object_ref (result);
808   g_mutex_unlock (&priv->lock);
809
810   return result;
811 }
812
813 /**
814  * gst_rtsp_stream_get_multicast_address:
815  * @stream: a #GstRTSPStream
816  * @family: the #GSocketFamily
817  *
818  * Get the multicast address of @stream for @family.
819  *
820  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
821  * or %NULL when no address could be allocated. gst_rtsp_address_free()
822  * after usage.
823  */
824 GstRTSPAddress *
825 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
826     GSocketFamily family)
827 {
828   GstRTSPStreamPrivate *priv;
829   GstRTSPAddress *result;
830   GstRTSPAddress **addrp;
831   GstRTSPAddressFlags flags;
832
833   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
834
835   priv = stream->priv;
836
837   if (family == G_SOCKET_FAMILY_IPV6) {
838     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
839     addrp = &priv->addr_v6;
840   } else {
841     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
842     addrp = &priv->addr_v4;
843   }
844
845   g_mutex_lock (&priv->lock);
846   if (*addrp == NULL) {
847     if (priv->pool == NULL)
848       goto no_pool;
849
850     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
851
852     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
853     if (*addrp == NULL)
854       goto no_address;
855   }
856   result = gst_rtsp_address_copy (*addrp);
857   g_mutex_unlock (&priv->lock);
858
859   return result;
860
861   /* ERRORS */
862 no_pool:
863   {
864     GST_ERROR_OBJECT (stream, "no address pool specified");
865     g_mutex_unlock (&priv->lock);
866     return NULL;
867   }
868 no_address:
869   {
870     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
871     g_mutex_unlock (&priv->lock);
872     return NULL;
873   }
874 }
875
876 /**
877  * gst_rtsp_stream_reserve_address:
878  * @stream: a #GstRTSPStream
879  * @address: an address
880  * @port: a port
881  * @n_ports: n_ports
882  * @ttl: a TTL
883  *
884  * Reserve @address and @port as the address and port of @stream.
885  *
886  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
887  * the address could be reserved. gst_rtsp_address_free() after usage.
888  */
889 GstRTSPAddress *
890 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
891     const gchar * address, guint port, guint n_ports, guint ttl)
892 {
893   GstRTSPStreamPrivate *priv;
894   GstRTSPAddress *result;
895   GInetAddress *addr;
896   GSocketFamily family;
897   GstRTSPAddress **addrp;
898
899   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
900   g_return_val_if_fail (address != NULL, NULL);
901   g_return_val_if_fail (port > 0, NULL);
902   g_return_val_if_fail (n_ports > 0, NULL);
903   g_return_val_if_fail (ttl > 0, NULL);
904
905   priv = stream->priv;
906
907   addr = g_inet_address_new_from_string (address);
908   if (!addr) {
909     GST_ERROR ("failed to get inet addr from %s", address);
910     family = G_SOCKET_FAMILY_IPV4;
911   } else {
912     family = g_inet_address_get_family (addr);
913     g_object_unref (addr);
914   }
915
916   if (family == G_SOCKET_FAMILY_IPV6)
917     addrp = &priv->addr_v6;
918   else
919     addrp = &priv->addr_v4;
920
921   g_mutex_lock (&priv->lock);
922   if (*addrp == NULL) {
923     GstRTSPAddressPoolResult res;
924
925     if (priv->pool == NULL)
926       goto no_pool;
927
928     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
929         port, n_ports, ttl, addrp);
930     if (res != GST_RTSP_ADDRESS_POOL_OK)
931       goto no_address;
932   } else {
933     if (strcmp ((*addrp)->address, address) ||
934         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
935         (*addrp)->ttl != ttl)
936       goto different_address;
937   }
938   result = gst_rtsp_address_copy (*addrp);
939   g_mutex_unlock (&priv->lock);
940
941   return result;
942
943   /* ERRORS */
944 no_pool:
945   {
946     GST_ERROR_OBJECT (stream, "no address pool specified");
947     g_mutex_unlock (&priv->lock);
948     return NULL;
949   }
950 no_address:
951   {
952     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
953         address);
954     g_mutex_unlock (&priv->lock);
955     return NULL;
956   }
957 different_address:
958   {
959     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
960         " reserved", address);
961     g_mutex_unlock (&priv->lock);
962     return NULL;
963   }
964 }
965
966 static gboolean
967 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
968     GSocketFamily family, GstElement * udpsrc_out[2],
969     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
970     GstRTSPAddress ** server_addr_out)
971 {
972   GstStateChangeReturn ret;
973   GstElement *udpsrc0, *udpsrc1;
974   GstElement *udpsink0, *udpsink1;
975   GSocket *rtp_socket = NULL;
976   GSocket *rtcp_socket;
977   gint tmp_rtp, tmp_rtcp;
978   guint count;
979   gint rtpport, rtcpport;
980   GList *rejected_addresses = NULL;
981   GstRTSPAddress *addr = NULL;
982   GInetAddress *inetaddr = NULL;
983   GSocketAddress *rtp_sockaddr = NULL;
984   GSocketAddress *rtcp_sockaddr = NULL;
985   const gchar *multisink_socket;
986
987   if (family == G_SOCKET_FAMILY_IPV6)
988     multisink_socket = "socket-v6";
989   else
990     multisink_socket = "socket";
991
992   udpsrc0 = NULL;
993   udpsrc1 = NULL;
994   udpsink0 = NULL;
995   udpsink1 = NULL;
996   count = 0;
997
998   /* Start with random port */
999   tmp_rtp = 0;
1000
1001   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1002       G_SOCKET_PROTOCOL_UDP, NULL);
1003   if (!rtcp_socket)
1004     goto no_udp_protocol;
1005
1006   if (*server_addr_out)
1007     gst_rtsp_address_free (*server_addr_out);
1008
1009   /* try to allocate 2 UDP ports, the RTP port should be an even
1010    * number and the RTCP port should be the next (uneven) port */
1011 again:
1012
1013   if (rtp_socket == NULL) {
1014     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1015         G_SOCKET_PROTOCOL_UDP, NULL);
1016     if (!rtp_socket)
1017       goto no_udp_protocol;
1018   }
1019
1020   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1021     GstRTSPAddressFlags flags;
1022
1023     if (addr)
1024       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1025
1026     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1027     if (family == G_SOCKET_FAMILY_IPV6)
1028       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1029     else
1030       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1031
1032     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1033
1034     if (addr == NULL)
1035       goto no_ports;
1036
1037     tmp_rtp = addr->port;
1038
1039     g_clear_object (&inetaddr);
1040     inetaddr = g_inet_address_new_from_string (addr->address);
1041   } else {
1042     if (tmp_rtp != 0) {
1043       tmp_rtp += 2;
1044       if (++count > 20)
1045         goto no_ports;
1046     }
1047
1048     if (inetaddr == NULL)
1049       inetaddr = g_inet_address_new_any (family);
1050   }
1051
1052   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1053   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1054     g_object_unref (rtp_sockaddr);
1055     goto again;
1056   }
1057   g_object_unref (rtp_sockaddr);
1058
1059   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1060   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1061     g_clear_object (&rtp_sockaddr);
1062     goto socket_error;
1063   }
1064
1065   tmp_rtp =
1066       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1067   g_object_unref (rtp_sockaddr);
1068
1069   /* check if port is even */
1070   if ((tmp_rtp & 1) != 0) {
1071     /* port not even, close and allocate another */
1072     tmp_rtp++;
1073     g_clear_object (&rtp_socket);
1074     goto again;
1075   }
1076
1077   /* set port */
1078   tmp_rtcp = tmp_rtp + 1;
1079
1080   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1081   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1082     g_object_unref (rtcp_sockaddr);
1083     g_clear_object (&rtp_socket);
1084     goto again;
1085   }
1086   g_object_unref (rtcp_sockaddr);
1087
1088   g_clear_object (&inetaddr);
1089
1090   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1091   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1092
1093   if (udpsrc0 == NULL || udpsrc1 == NULL)
1094     goto no_udp_protocol;
1095
1096   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1097   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1098
1099   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1100   if (ret == GST_STATE_CHANGE_FAILURE)
1101     goto element_error;
1102   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1103   if (ret == GST_STATE_CHANGE_FAILURE)
1104     goto element_error;
1105
1106   /* all fine, do port check */
1107   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1108   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1109
1110   /* this should not happen... */
1111   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1112     goto port_error;
1113
1114   if (udpsink_out[0])
1115     udpsink0 = udpsink_out[0];
1116   else
1117     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1118
1119   if (!udpsink0)
1120     goto no_udp_protocol;
1121
1122   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1123   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1124
1125   if (udpsink_out[1])
1126     udpsink1 = udpsink_out[1];
1127   else
1128     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1129
1130   if (!udpsink1)
1131     goto no_udp_protocol;
1132
1133   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1134   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1135   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1136
1137   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1138   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1139   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1140   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1141   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1142   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1143   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1144   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1145
1146   /* we keep these elements, we will further configure them when the
1147    * client told us to really use the UDP ports. */
1148   udpsrc_out[0] = udpsrc0;
1149   udpsrc_out[1] = udpsrc1;
1150   udpsink_out[0] = udpsink0;
1151   udpsink_out[1] = udpsink1;
1152
1153   server_port_out->min = rtpport;
1154   server_port_out->max = rtcpport;
1155
1156   *server_addr_out = addr;
1157   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1158
1159   g_object_unref (rtp_socket);
1160   g_object_unref (rtcp_socket);
1161
1162   return TRUE;
1163
1164   /* ERRORS */
1165 no_udp_protocol:
1166   {
1167     goto cleanup;
1168   }
1169 no_ports:
1170   {
1171     goto cleanup;
1172   }
1173 port_error:
1174   {
1175     goto cleanup;
1176   }
1177 socket_error:
1178   {
1179     goto cleanup;
1180   }
1181 element_error:
1182   {
1183     goto cleanup;
1184   }
1185 cleanup:
1186   {
1187     if (udpsrc0) {
1188       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1189       gst_object_unref (udpsrc0);
1190     }
1191     if (udpsrc1) {
1192       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1193       gst_object_unref (udpsrc1);
1194     }
1195     if (udpsink0) {
1196       gst_element_set_state (udpsink0, GST_STATE_NULL);
1197       gst_object_unref (udpsink0);
1198     }
1199     if (inetaddr)
1200       g_object_unref (inetaddr);
1201     g_list_free_full (rejected_addresses,
1202         (GDestroyNotify) gst_rtsp_address_free);
1203     if (addr)
1204       gst_rtsp_address_free (addr);
1205     if (rtp_socket)
1206       g_object_unref (rtp_socket);
1207     if (rtcp_socket)
1208       g_object_unref (rtcp_socket);
1209     return FALSE;
1210   }
1211 }
1212
1213 /* must be called with lock */
1214 static gboolean
1215 alloc_ports (GstRTSPStream * stream)
1216 {
1217   GstRTSPStreamPrivate *priv = stream->priv;
1218
1219   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1220       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1221       &priv->server_port_v4, &priv->server_addr_v4);
1222
1223   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1224       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1225       &priv->server_port_v6, &priv->server_addr_v6);
1226
1227   return priv->have_ipv4 || priv->have_ipv6;
1228 }
1229
1230 /**
1231  * gst_rtsp_stream_get_server_port:
1232  * @stream: a #GstRTSPStream
1233  * @server_port: (out): result server port
1234  * @family: the port family to get
1235  *
1236  * Fill @server_port with the port pair used by the server. This function can
1237  * only be called when @stream has been joined.
1238  */
1239 void
1240 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1241     GstRTSPRange * server_port, GSocketFamily family)
1242 {
1243   GstRTSPStreamPrivate *priv;
1244
1245   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1246   priv = stream->priv;
1247   g_return_if_fail (priv->is_joined);
1248
1249   g_mutex_lock (&priv->lock);
1250   if (family == G_SOCKET_FAMILY_IPV4) {
1251     if (server_port)
1252       *server_port = priv->server_port_v4;
1253   } else {
1254     if (server_port)
1255       *server_port = priv->server_port_v6;
1256   }
1257   g_mutex_unlock (&priv->lock);
1258 }
1259
1260 /**
1261  * gst_rtsp_stream_get_rtpsession:
1262  * @stream: a #GstRTSPStream
1263  *
1264  * Get the RTP session of this stream.
1265  *
1266  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1267  */
1268 GObject *
1269 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1270 {
1271   GstRTSPStreamPrivate *priv;
1272   GObject *session;
1273
1274   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1275
1276   priv = stream->priv;
1277
1278   g_mutex_lock (&priv->lock);
1279   if ((session = priv->session))
1280     g_object_ref (session);
1281   g_mutex_unlock (&priv->lock);
1282
1283   return session;
1284 }
1285
1286 /**
1287  * gst_rtsp_stream_get_ssrc:
1288  * @stream: a #GstRTSPStream
1289  * @ssrc: (out): result ssrc
1290  *
1291  * Get the SSRC used by the RTP session of this stream. This function can only
1292  * be called when @stream has been joined.
1293  */
1294 void
1295 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1296 {
1297   GstRTSPStreamPrivate *priv;
1298
1299   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1300   priv = stream->priv;
1301   g_return_if_fail (priv->is_joined);
1302
1303   g_mutex_lock (&priv->lock);
1304   if (ssrc && priv->session)
1305     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1306   g_mutex_unlock (&priv->lock);
1307 }
1308
1309 /* executed from streaming thread */
1310 static void
1311 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1312 {
1313   GstRTSPStreamPrivate *priv = stream->priv;
1314   GstCaps *newcaps, *oldcaps;
1315
1316   newcaps = gst_pad_get_current_caps (pad);
1317
1318   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1319       newcaps);
1320
1321   g_mutex_lock (&priv->lock);
1322   oldcaps = priv->caps;
1323   priv->caps = newcaps;
1324   g_mutex_unlock (&priv->lock);
1325
1326   if (oldcaps)
1327     gst_caps_unref (oldcaps);
1328 }
1329
1330 static void
1331 dump_structure (const GstStructure * s)
1332 {
1333   gchar *sstr;
1334
1335   sstr = gst_structure_to_string (s);
1336   GST_INFO ("structure: %s", sstr);
1337   g_free (sstr);
1338 }
1339
1340 static GstRTSPStreamTransport *
1341 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1342 {
1343   GstRTSPStreamPrivate *priv = stream->priv;
1344   GList *walk;
1345   GstRTSPStreamTransport *result = NULL;
1346   const gchar *tmp;
1347   gchar *dest;
1348   guint port;
1349
1350   if (rtcp_from == NULL)
1351     return NULL;
1352
1353   tmp = g_strrstr (rtcp_from, ":");
1354   if (tmp == NULL)
1355     return NULL;
1356
1357   port = atoi (tmp + 1);
1358   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1359
1360   g_mutex_lock (&priv->lock);
1361   GST_INFO ("finding %s:%d in %d transports", dest, port,
1362       g_list_length (priv->transports));
1363
1364   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1365     GstRTSPStreamTransport *trans = walk->data;
1366     const GstRTSPTransport *tr;
1367     gint min, max;
1368
1369     tr = gst_rtsp_stream_transport_get_transport (trans);
1370
1371     min = tr->client_port.min;
1372     max = tr->client_port.max;
1373
1374     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1375       result = trans;
1376       break;
1377     }
1378   }
1379   if (result)
1380     g_object_ref (result);
1381   g_mutex_unlock (&priv->lock);
1382
1383   g_free (dest);
1384
1385   return result;
1386 }
1387
1388 static GstRTSPStreamTransport *
1389 check_transport (GObject * source, GstRTSPStream * stream)
1390 {
1391   GstStructure *stats;
1392   GstRTSPStreamTransport *trans;
1393
1394   /* see if we have a stream to match with the origin of the RTCP packet */
1395   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1396   if (trans == NULL) {
1397     g_object_get (source, "stats", &stats, NULL);
1398     if (stats) {
1399       const gchar *rtcp_from;
1400
1401       dump_structure (stats);
1402
1403       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1404       if ((trans = find_transport (stream, rtcp_from))) {
1405         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1406             source);
1407         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1408             g_object_unref);
1409       }
1410       gst_structure_free (stats);
1411     }
1412   }
1413   return trans;
1414 }
1415
1416
1417 static void
1418 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1419 {
1420   GstRTSPStreamTransport *trans;
1421
1422   GST_INFO ("%p: new source %p", stream, source);
1423
1424   trans = check_transport (source, stream);
1425
1426   if (trans)
1427     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1428 }
1429
1430 static void
1431 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1432 {
1433   GST_INFO ("%p: new SDES %p", stream, source);
1434 }
1435
1436 static void
1437 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1438 {
1439   GstRTSPStreamTransport *trans;
1440
1441   trans = check_transport (source, stream);
1442
1443   if (trans) {
1444     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1445     gst_rtsp_stream_transport_keep_alive (trans);
1446   }
1447 #ifdef DUMP_STATS
1448   {
1449     GstStructure *stats;
1450     g_object_get (source, "stats", &stats, NULL);
1451     if (stats) {
1452       dump_structure (stats);
1453       gst_structure_free (stats);
1454     }
1455   }
1456 #endif
1457 }
1458
1459 static void
1460 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1461 {
1462   GST_INFO ("%p: source %p bye", stream, source);
1463 }
1464
1465 static void
1466 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1467 {
1468   GstRTSPStreamTransport *trans;
1469
1470   GST_INFO ("%p: source %p bye timeout", stream, source);
1471
1472   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1473     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1474     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1475   }
1476 }
1477
1478 static void
1479 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1480 {
1481   GstRTSPStreamTransport *trans;
1482
1483   GST_INFO ("%p: source %p timeout", stream, source);
1484
1485   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1486     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1487     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1488   }
1489 }
1490
1491 static void
1492 clear_tr_cache (GstRTSPStreamPrivate * priv)
1493 {
1494   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1495   g_list_free (priv->tr_cache);
1496   priv->tr_cache = NULL;
1497 }
1498
1499 static GstFlowReturn
1500 handle_new_sample (GstAppSink * sink, gpointer user_data)
1501 {
1502   GstRTSPStreamPrivate *priv;
1503   GList *walk;
1504   GstSample *sample;
1505   GstBuffer *buffer;
1506   GstRTSPStream *stream;
1507   gboolean is_rtp;
1508
1509   sample = gst_app_sink_pull_sample (sink);
1510   if (!sample)
1511     return GST_FLOW_OK;
1512
1513   stream = (GstRTSPStream *) user_data;
1514   priv = stream->priv;
1515   buffer = gst_sample_get_buffer (sample);
1516
1517   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1518
1519   g_mutex_lock (&priv->lock);
1520   if (priv->tr_cache_cookie != priv->transports_cookie) {
1521     clear_tr_cache (priv);
1522     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1523       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1524       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1525     }
1526     priv->tr_cache_cookie = priv->transports_cookie;
1527   }
1528   g_mutex_unlock (&priv->lock);
1529
1530   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1531     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1532
1533     if (is_rtp) {
1534       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1535     } else {
1536       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1537     }
1538   }
1539   gst_sample_unref (sample);
1540
1541   return GST_FLOW_OK;
1542 }
1543
1544 static GstAppSinkCallbacks sink_cb = {
1545   NULL,                         /* not interested in EOS */
1546   NULL,                         /* not interested in preroll samples */
1547   handle_new_sample,
1548 };
1549
1550 static GstElement *
1551 get_rtp_encoder (GstRTSPStream * stream, guint session)
1552 {
1553   GstRTSPStreamPrivate *priv = stream->priv;
1554
1555   if (priv->srtpenc == NULL) {
1556     gchar *name;
1557
1558     name = g_strdup_printf ("srtpenc_%u", session);
1559     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1560     g_free (name);
1561
1562     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1563   }
1564   return gst_object_ref (priv->srtpenc);
1565 }
1566
1567 static GstElement *
1568 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1569 {
1570   GstRTSPStreamPrivate *priv = stream->priv;
1571   GstElement *oldenc, *enc;
1572   GstPad *pad;
1573   gchar *name;
1574
1575   if (priv->idx != session)
1576     return NULL;
1577
1578   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1579
1580   oldenc = priv->srtpenc;
1581   enc = get_rtp_encoder (stream, session);
1582   name = g_strdup_printf ("rtp_sink_%d", session);
1583   pad = gst_element_get_request_pad (enc, name);
1584   g_free (name);
1585   gst_object_unref (pad);
1586
1587   if (oldenc == NULL)
1588     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1589         enc);
1590
1591   return enc;
1592 }
1593
1594 static GstElement *
1595 request_rtcp_encoder (GstElement * rtpbin, guint session,
1596     GstRTSPStream * stream)
1597 {
1598   GstRTSPStreamPrivate *priv = stream->priv;
1599   GstElement *oldenc, *enc;
1600   GstPad *pad;
1601   gchar *name;
1602
1603   if (priv->idx != session)
1604     return NULL;
1605
1606   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1607
1608   oldenc = priv->srtpenc;
1609   enc = get_rtp_encoder (stream, session);
1610   name = g_strdup_printf ("rtcp_sink_%d", session);
1611   pad = gst_element_get_request_pad (enc, name);
1612   g_free (name);
1613   gst_object_unref (pad);
1614
1615   if (oldenc == NULL)
1616     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1617         enc);
1618
1619   return enc;
1620 }
1621
1622 static GstCaps *
1623 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1624 {
1625   GstRTSPStreamPrivate *priv = stream->priv;
1626   GstCaps *caps;
1627
1628   GST_DEBUG ("request key %08x", ssrc);
1629
1630   g_mutex_lock (&priv->lock);
1631   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1632     gst_caps_ref (caps);
1633   g_mutex_unlock (&priv->lock);
1634
1635   return caps;
1636 }
1637
1638 static GstElement *
1639 request_rtcp_decoder (GstElement * rtpbin, guint session,
1640     GstRTSPStream * stream)
1641 {
1642   GstRTSPStreamPrivate *priv = stream->priv;
1643
1644   if (priv->idx != session)
1645     return NULL;
1646
1647   if (priv->srtpdec == NULL) {
1648     gchar *name;
1649
1650     name = g_strdup_printf ("srtpdec_%u", session);
1651     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1652     g_free (name);
1653
1654     g_signal_connect (priv->srtpdec, "request-key",
1655         (GCallback) request_key, stream);
1656   }
1657   return gst_object_ref (priv->srtpdec);
1658 }
1659
1660 /**
1661  * gst_rtsp_stream_join_bin:
1662  * @stream: a #GstRTSPStream
1663  * @bin: (transfer none): a #GstBin to join
1664  * @rtpbin: (transfer none): a rtpbin element in @bin
1665  * @state: the target state of the new elements
1666  *
1667  * Join the #GstBin @bin that contains the element @rtpbin.
1668  *
1669  * @stream will link to @rtpbin, which must be inside @bin. The elements
1670  * added to @bin will be set to the state given in @state.
1671  *
1672  * Returns: %TRUE on success.
1673  */
1674 gboolean
1675 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1676     GstElement * rtpbin, GstState state)
1677 {
1678   GstRTSPStreamPrivate *priv;
1679   gint i;
1680   guint idx;
1681   gchar *name;
1682   GstPad *pad, *sinkpad, *selpad;
1683   GstPadLinkReturn ret;
1684
1685   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1686   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1687   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1688
1689   priv = stream->priv;
1690
1691   g_mutex_lock (&priv->lock);
1692   if (priv->is_joined)
1693     goto was_joined;
1694
1695   /* create a session with the same index as the stream */
1696   idx = priv->idx;
1697
1698   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1699
1700   if (!alloc_ports (stream))
1701     goto no_ports;
1702
1703   /* update the dscp qos field in the sinks */
1704   update_dscp_qos (stream);
1705
1706   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1707       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1708     /* For SRTP */
1709     g_signal_connect (rtpbin, "request-rtp-encoder",
1710         (GCallback) request_rtp_encoder, stream);
1711     g_signal_connect (rtpbin, "request-rtcp-encoder",
1712         (GCallback) request_rtcp_encoder, stream);
1713     g_signal_connect (rtpbin, "request-rtcp-decoder",
1714         (GCallback) request_rtcp_decoder, stream);
1715   }
1716
1717   /* get a pad for sending RTP */
1718   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1719   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1720   g_free (name);
1721   /* link the RTP pad to the session manager, it should not really fail unless
1722    * this is not really an RTP pad */
1723   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1724   if (ret != GST_PAD_LINK_OK)
1725     goto link_failed;
1726
1727   /* get pads from the RTP session element for sending and receiving
1728    * RTP/RTCP*/
1729   name = g_strdup_printf ("send_rtp_src_%u", idx);
1730   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1731   g_free (name);
1732   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1733   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1734   g_free (name);
1735   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1736   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1737   g_free (name);
1738   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1739   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1740   g_free (name);
1741
1742   /* get the session */
1743   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1744
1745   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1746       stream);
1747   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1748       stream);
1749   g_signal_connect (priv->session, "on-ssrc-active",
1750       (GCallback) on_ssrc_active, stream);
1751   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1752       stream);
1753   g_signal_connect (priv->session, "on-bye-timeout",
1754       (GCallback) on_bye_timeout, stream);
1755   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1756       stream);
1757
1758   for (i = 0; i < 2; i++) {
1759     GstPad *teepad, *queuepad;
1760     /* For the sender we create this bit of pipeline for both
1761      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1762      * we need to add a queue before appsink to make the pipeline
1763      * not block. For the TCP case, we want to pump data to the
1764      * client as fast as possible anyway.
1765      *
1766      * .--------.      .-----.    .---------.
1767      * | rtpbin |      | tee |    | udpsink |
1768      * |       send->sink   src->sink       |
1769      * '--------'      |     |    '---------'
1770      *                 |     |    .---------.    .---------.
1771      *                 |     |    |  queue  |    | appsink |
1772      *                 |    src->sink      src->sink       |
1773      *                 '-----'    '---------'    '---------'
1774      *
1775      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1776      * udpsink directly to the session.
1777      */
1778     /* add udpsink */
1779     gst_bin_add (bin, priv->udpsink[i]);
1780     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1781
1782     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1783       /* make tee for RTP/RTCP */
1784       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1785       gst_bin_add (bin, priv->tee[i]);
1786
1787       /* and link to rtpbin send pad */
1788       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1789       gst_pad_link (priv->send_src[i], pad);
1790       gst_object_unref (pad);
1791
1792       /* link tee to udpsink */
1793       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1794       gst_pad_link (teepad, sinkpad);
1795       gst_object_unref (teepad);
1796
1797       /* make queue */
1798       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1799       gst_bin_add (bin, priv->appqueue[i]);
1800       /* and link to tee */
1801       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1802       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1803       gst_pad_link (teepad, pad);
1804       gst_object_unref (pad);
1805       gst_object_unref (teepad);
1806
1807       /* make appsink */
1808       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1809       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1810       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1811       gst_bin_add (bin, priv->appsink[i]);
1812       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1813           &sink_cb, stream, NULL);
1814       /* and link to queue */
1815       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1816       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1817       gst_pad_link (queuepad, pad);
1818       gst_object_unref (pad);
1819       gst_object_unref (queuepad);
1820     } else {
1821       /* else only udpsink needed, link it to the session */
1822       gst_pad_link (priv->send_src[i], sinkpad);
1823     }
1824     gst_object_unref (sinkpad);
1825
1826     /* For the receiver we create this bit of pipeline for both
1827      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1828      * and it is all funneled into the rtpbin receive pad.
1829      *
1830      * .--------.     .--------.    .--------.
1831      * | udpsrc |     | funnel |    | rtpbin |
1832      * |       src->sink      src->sink      |
1833      * '--------'     |        |    '--------'
1834      * .--------.     |        |
1835      * | appsrc |     |        |
1836      * |       src->sink       |
1837      * '--------'     '--------'
1838      */
1839     /* make funnel for the RTP/RTCP receivers */
1840     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1841     gst_bin_add (bin, priv->funnel[i]);
1842
1843     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1844     gst_pad_link (pad, priv->recv_sink[i]);
1845     gst_object_unref (pad);
1846
1847     if (priv->udpsrc_v4[i]) {
1848       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1849        * values */
1850       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1851       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1852       /* add udpsrc */
1853       gst_bin_add (bin, priv->udpsrc_v4[i]);
1854
1855       /* and link to the funnel v4 */
1856       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1857       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1858       gst_pad_link (pad, selpad);
1859       gst_object_unref (pad);
1860       gst_object_unref (selpad);
1861     }
1862
1863     if (priv->udpsrc_v6[i]) {
1864       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1865       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1866       gst_bin_add (bin, priv->udpsrc_v6[i]);
1867
1868       /* and link to the funnel v6 */
1869       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1870       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1871       gst_pad_link (pad, selpad);
1872       gst_object_unref (pad);
1873       gst_object_unref (selpad);
1874     }
1875
1876     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1877       /* make and add appsrc */
1878       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1879       gst_bin_add (bin, priv->appsrc[i]);
1880       /* and link to the funnel */
1881       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1882       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1883       gst_pad_link (pad, selpad);
1884       gst_object_unref (pad);
1885       gst_object_unref (selpad);
1886     }
1887
1888     /* check if we need to set to a special state */
1889     if (state != GST_STATE_NULL) {
1890       if (priv->udpsink[i])
1891         gst_element_set_state (priv->udpsink[i], state);
1892       if (priv->appsink[i])
1893         gst_element_set_state (priv->appsink[i], state);
1894       if (priv->appqueue[i])
1895         gst_element_set_state (priv->appqueue[i], state);
1896       if (priv->tee[i])
1897         gst_element_set_state (priv->tee[i], state);
1898       if (priv->funnel[i])
1899         gst_element_set_state (priv->funnel[i], state);
1900       if (priv->appsrc[i])
1901         gst_element_set_state (priv->appsrc[i], state);
1902     }
1903   }
1904
1905   /* be notified of caps changes */
1906   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
1907       (GCallback) caps_notify, stream);
1908
1909   priv->is_joined = TRUE;
1910   g_mutex_unlock (&priv->lock);
1911
1912   return TRUE;
1913
1914   /* ERRORS */
1915 was_joined:
1916   {
1917     g_mutex_unlock (&priv->lock);
1918     return TRUE;
1919   }
1920 no_ports:
1921   {
1922     g_mutex_unlock (&priv->lock);
1923     GST_WARNING ("failed to allocate ports %u", idx);
1924     return FALSE;
1925   }
1926 link_failed:
1927   {
1928     GST_WARNING ("failed to link stream %u", idx);
1929     gst_object_unref (priv->send_rtp_sink);
1930     priv->send_rtp_sink = NULL;
1931     g_mutex_unlock (&priv->lock);
1932     return FALSE;
1933   }
1934 }
1935
1936 /**
1937  * gst_rtsp_stream_leave_bin:
1938  * @stream: a #GstRTSPStream
1939  * @bin: (transfer none): a #GstBin
1940  * @rtpbin: (transfer none): a rtpbin #GstElement
1941  *
1942  * Remove the elements of @stream from @bin.
1943  *
1944  * Return: %TRUE on success.
1945  */
1946 gboolean
1947 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1948     GstElement * rtpbin)
1949 {
1950   GstRTSPStreamPrivate *priv;
1951   gint i;
1952   GList *l;
1953
1954   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1955   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1956   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1957
1958   priv = stream->priv;
1959
1960   g_mutex_lock (&priv->lock);
1961   if (!priv->is_joined)
1962     goto was_not_joined;
1963
1964   /* all transports must be removed by now */
1965   g_return_val_if_fail (priv->transports == NULL, FALSE);
1966
1967   clear_tr_cache (priv);
1968
1969   GST_INFO ("stream %p leaving bin", stream);
1970
1971   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1972   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
1973   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1974   gst_object_unref (priv->send_rtp_sink);
1975   priv->send_rtp_sink = NULL;
1976
1977   for (i = 0; i < 2; i++) {
1978     if (priv->udpsink[i])
1979       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1980     if (priv->appsink[i])
1981       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1982     if (priv->appqueue[i])
1983       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1984     if (priv->tee[i])
1985       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1986     if (priv->funnel[i])
1987       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1988     if (priv->appsrc[i])
1989       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1990     if (priv->udpsrc_v4[i]) {
1991       /* and set udpsrc to NULL now before removing */
1992       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1993       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1994       /* removing them should also nicely release the request
1995        * pads when they finalize */
1996       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1997     }
1998     if (priv->udpsrc_v6[i]) {
1999       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2000       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2001       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2002     }
2003
2004     for (l = priv->transport_sources; l; l = l->next) {
2005       GstRTSPMulticastTransportSource *s = l->data;
2006
2007       if (!s->udpsrc[i])
2008         continue;
2009
2010       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2011       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2012       gst_bin_remove (bin, s->udpsrc[i]);
2013     }
2014
2015     if (priv->udpsink[i])
2016       gst_bin_remove (bin, priv->udpsink[i]);
2017     if (priv->appsrc[i])
2018       gst_bin_remove (bin, priv->appsrc[i]);
2019     if (priv->appsink[i])
2020       gst_bin_remove (bin, priv->appsink[i]);
2021     if (priv->appqueue[i])
2022       gst_bin_remove (bin, priv->appqueue[i]);
2023     if (priv->tee[i])
2024       gst_bin_remove (bin, priv->tee[i]);
2025     if (priv->funnel[i])
2026       gst_bin_remove (bin, priv->funnel[i]);
2027
2028     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2029     gst_object_unref (priv->recv_sink[i]);
2030     priv->recv_sink[i] = NULL;
2031
2032     priv->udpsrc_v4[i] = NULL;
2033     priv->udpsrc_v6[i] = NULL;
2034     priv->udpsink[i] = NULL;
2035     priv->appsrc[i] = NULL;
2036     priv->appsink[i] = NULL;
2037     priv->appqueue[i] = NULL;
2038     priv->tee[i] = NULL;
2039     priv->funnel[i] = NULL;
2040   }
2041
2042   for (l = priv->transport_sources; l; l = l->next) {
2043     GstRTSPMulticastTransportSource *s = l->data;
2044     g_slice_free (GstRTSPMulticastTransportSource, s);
2045   }
2046   g_list_free (priv->transport_sources);
2047   priv->transport_sources = NULL;
2048
2049   gst_object_unref (priv->send_src[0]);
2050   priv->send_src[0] = NULL;
2051
2052   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2053   gst_object_unref (priv->send_src[1]);
2054   priv->send_src[1] = NULL;
2055
2056   g_object_unref (priv->session);
2057   priv->session = NULL;
2058   if (priv->caps)
2059     gst_caps_unref (priv->caps);
2060   priv->caps = NULL;
2061
2062   if (priv->srtpenc)
2063     gst_object_unref (priv->srtpenc);
2064
2065   priv->is_joined = FALSE;
2066   g_mutex_unlock (&priv->lock);
2067
2068   return TRUE;
2069
2070 was_not_joined:
2071   {
2072     g_mutex_unlock (&priv->lock);
2073     return TRUE;
2074   }
2075 }
2076
2077 /**
2078  * gst_rtsp_stream_get_rtpinfo:
2079  * @stream: a #GstRTSPStream
2080  * @rtptime: (allow-none): result RTP timestamp
2081  * @seq: (allow-none): result RTP seqnum
2082  * @clock_rate: (allow-none): the clock rate
2083  * @running_time: (allow-none): result running-time
2084  *
2085  * Retrieve the current rtptime, seq and running-time. This is used to
2086  * construct a RTPInfo reply header.
2087  *
2088  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2089  */
2090 gboolean
2091 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2092     guint * rtptime, guint * seq, guint * clock_rate,
2093     GstClockTime * running_time)
2094 {
2095   GstRTSPStreamPrivate *priv;
2096   GstStructure *stats;
2097   GObjectClass *payobjclass;
2098
2099   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2100
2101   priv = stream->priv;
2102
2103   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2104
2105   g_mutex_lock (&priv->lock);
2106
2107   if (g_object_class_find_property (payobjclass, "stats")) {
2108     g_object_get (priv->payloader, "stats", &stats, NULL);
2109     if (stats == NULL)
2110       goto no_stats;
2111
2112     if (seq)
2113       gst_structure_get_uint (stats, "seqnum", seq);
2114
2115     if (rtptime)
2116       gst_structure_get_uint (stats, "timestamp", rtptime);
2117
2118     if (running_time)
2119       gst_structure_get_clock_time (stats, "running-time", running_time);
2120
2121     if (clock_rate) {
2122       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2123       if (*clock_rate == 0 && running_time)
2124         *running_time = GST_CLOCK_TIME_NONE;
2125     }
2126     gst_structure_free (stats);
2127   } else {
2128     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2129         !g_object_class_find_property (payobjclass, "timestamp"))
2130       goto no_stats;
2131
2132     if (seq)
2133       g_object_get (priv->payloader, "seqnum", seq, NULL);
2134
2135     if (rtptime)
2136       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2137
2138     if (running_time)
2139       *running_time = GST_CLOCK_TIME_NONE;
2140   }
2141   g_mutex_unlock (&priv->lock);
2142
2143   return TRUE;
2144
2145   /* ERRORS */
2146 no_stats:
2147   {
2148     GST_WARNING ("Could not get payloader stats");
2149     g_mutex_unlock (&priv->lock);
2150     return FALSE;
2151   }
2152 }
2153
2154 /**
2155  * gst_rtsp_stream_get_caps:
2156  * @stream: a #GstRTSPStream
2157  *
2158  * Retrieve the current caps of @stream.
2159  *
2160  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2161  * after usage.
2162  */
2163 GstCaps *
2164 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2165 {
2166   GstRTSPStreamPrivate *priv;
2167   GstCaps *result;
2168
2169   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2170
2171   priv = stream->priv;
2172
2173   g_mutex_lock (&priv->lock);
2174   if ((result = priv->caps))
2175     gst_caps_ref (result);
2176   g_mutex_unlock (&priv->lock);
2177
2178   return result;
2179 }
2180
2181 /**
2182  * gst_rtsp_stream_recv_rtp:
2183  * @stream: a #GstRTSPStream
2184  * @buffer: (transfer full): a #GstBuffer
2185  *
2186  * Handle an RTP buffer for the stream. This method is usually called when a
2187  * message has been received from a client using the TCP transport.
2188  *
2189  * This function takes ownership of @buffer.
2190  *
2191  * Returns: a GstFlowReturn.
2192  */
2193 GstFlowReturn
2194 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2195 {
2196   GstRTSPStreamPrivate *priv;
2197   GstFlowReturn ret;
2198   GstElement *element;
2199
2200   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2201   priv = stream->priv;
2202   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2203   g_return_val_if_fail (priv->is_joined, FALSE);
2204
2205   g_mutex_lock (&priv->lock);
2206   if (priv->appsrc[0])
2207     element = gst_object_ref (priv->appsrc[0]);
2208   else
2209     element = NULL;
2210   g_mutex_unlock (&priv->lock);
2211
2212   if (element) {
2213     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2214     gst_object_unref (element);
2215   } else {
2216     ret = GST_FLOW_OK;
2217   }
2218   return ret;
2219 }
2220
2221 /**
2222  * gst_rtsp_stream_recv_rtcp:
2223  * @stream: a #GstRTSPStream
2224  * @buffer: (transfer full): a #GstBuffer
2225  *
2226  * Handle an RTCP buffer for the stream. This method is usually called when a
2227  * message has been received from a client using the TCP transport.
2228  *
2229  * This function takes ownership of @buffer.
2230  *
2231  * Returns: a GstFlowReturn.
2232  */
2233 GstFlowReturn
2234 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2235 {
2236   GstRTSPStreamPrivate *priv;
2237   GstFlowReturn ret;
2238   GstElement *element;
2239
2240   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2241   priv = stream->priv;
2242   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2243   g_return_val_if_fail (priv->is_joined, FALSE);
2244
2245   g_mutex_lock (&priv->lock);
2246   if (priv->appsrc[1])
2247     element = gst_object_ref (priv->appsrc[1]);
2248   else
2249     element = NULL;
2250   g_mutex_unlock (&priv->lock);
2251
2252   if (element) {
2253     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2254     gst_object_unref (element);
2255   } else {
2256     ret = GST_FLOW_OK;
2257     gst_buffer_unref (buffer);
2258   }
2259   return ret;
2260 }
2261
2262 /* must be called with lock */
2263 static gboolean
2264 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2265     gboolean add)
2266 {
2267   GstRTSPStreamPrivate *priv = stream->priv;
2268   const GstRTSPTransport *tr;
2269
2270   tr = gst_rtsp_stream_transport_get_transport (trans);
2271
2272   switch (tr->lower_transport) {
2273     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2274     {
2275       GstRTSPMulticastTransportSource *source;
2276       GstBin *bin;
2277
2278       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2279
2280       if (add) {
2281         gchar *host;
2282         gint i;
2283         GstPad *selpad, *pad;
2284
2285         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2286         source->transport = trans;
2287
2288         for (i = 0; i < 2; i++) {
2289           host =
2290               g_strdup_printf ("udp://%s:%d", tr->destination,
2291               (i == 0) ? tr->port.min : tr->port.max);
2292           source->udpsrc[i] =
2293               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2294           g_free (host);
2295
2296           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2297            * values */
2298           gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2299           gst_element_set_locked_state (source->udpsrc[i], TRUE);
2300           /* add udpsrc */
2301           gst_bin_add (bin, source->udpsrc[i]);
2302
2303           /* and link to the funnel v4 */
2304           source->selpad[i] = selpad =
2305               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2306           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2307           gst_pad_link (pad, selpad);
2308           gst_object_unref (pad);
2309           gst_object_unref (selpad);
2310         }
2311         gst_object_unref (bin);
2312
2313         priv->transport_sources =
2314             g_list_prepend (priv->transport_sources, source);
2315       } else {
2316         GList *l;
2317
2318         for (l = priv->transport_sources; l; l = l->next) {
2319           source = l->data;
2320
2321           if (source->transport == trans) {
2322             priv->transport_sources =
2323                 g_list_delete_link (priv->transport_sources, l);
2324             break;
2325           }
2326         }
2327
2328         if (l != NULL) {
2329           gint i;
2330
2331           for (i = 0; i < 2; i++) {
2332             /* Will automatically unlink everything */
2333             gst_bin_remove (bin,
2334                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2335
2336             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2337             gst_object_unref (source->udpsrc[i]);
2338
2339             gst_element_release_request_pad (priv->funnel[i],
2340                 source->selpad[i]);
2341           }
2342
2343           g_slice_free (GstRTSPMulticastTransportSource, source);
2344         }
2345       }
2346
2347       /* fall through for the generic case */
2348     }
2349     case GST_RTSP_LOWER_TRANS_UDP:
2350     {
2351       gchar *dest;
2352       gint min, max;
2353       guint ttl = 0;
2354
2355       dest = tr->destination;
2356       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2357         min = tr->port.min;
2358         max = tr->port.max;
2359         ttl = tr->ttl;
2360       } else {
2361         min = tr->client_port.min;
2362         max = tr->client_port.max;
2363       }
2364
2365       if (add) {
2366         if (ttl > 0) {
2367           GST_INFO ("setting ttl-mc %d", ttl);
2368           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2369           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2370         }
2371         GST_INFO ("adding %s:%d-%d", dest, min, max);
2372         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2373         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2374         priv->transports = g_list_prepend (priv->transports, trans);
2375       } else {
2376         GST_INFO ("removing %s:%d-%d", dest, min, max);
2377         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2378         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2379         priv->transports = g_list_remove (priv->transports, trans);
2380       }
2381       priv->transports_cookie++;
2382       break;
2383     }
2384     case GST_RTSP_LOWER_TRANS_TCP:
2385       if (add) {
2386         GST_INFO ("adding TCP %s", tr->destination);
2387         priv->transports = g_list_prepend (priv->transports, trans);
2388       } else {
2389         GST_INFO ("removing TCP %s", tr->destination);
2390         priv->transports = g_list_remove (priv->transports, trans);
2391       }
2392       priv->transports_cookie++;
2393       break;
2394     default:
2395       goto unknown_transport;
2396   }
2397   return TRUE;
2398
2399   /* ERRORS */
2400 unknown_transport:
2401   {
2402     GST_INFO ("Unknown transport %d", tr->lower_transport);
2403     return FALSE;
2404   }
2405 }
2406
2407
2408 /**
2409  * gst_rtsp_stream_add_transport:
2410  * @stream: a #GstRTSPStream
2411  * @trans: (transfer none): a #GstRTSPStreamTransport
2412  *
2413  * Add the transport in @trans to @stream. The media of @stream will
2414  * then also be send to the values configured in @trans.
2415  *
2416  * @stream must be joined to a bin.
2417  *
2418  * @trans must contain a valid #GstRTSPTransport.
2419  *
2420  * Returns: %TRUE if @trans was added
2421  */
2422 gboolean
2423 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2424     GstRTSPStreamTransport * trans)
2425 {
2426   GstRTSPStreamPrivate *priv;
2427   gboolean res;
2428
2429   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2430   priv = stream->priv;
2431   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2432   g_return_val_if_fail (priv->is_joined, FALSE);
2433
2434   g_mutex_lock (&priv->lock);
2435   res = update_transport (stream, trans, TRUE);
2436   g_mutex_unlock (&priv->lock);
2437
2438   return res;
2439 }
2440
2441 /**
2442  * gst_rtsp_stream_remove_transport:
2443  * @stream: a #GstRTSPStream
2444  * @trans: (transfer none): a #GstRTSPStreamTransport
2445  *
2446  * Remove the transport in @trans from @stream. The media of @stream will
2447  * not be sent to the values configured in @trans.
2448  *
2449  * @stream must be joined to a bin.
2450  *
2451  * @trans must contain a valid #GstRTSPTransport.
2452  *
2453  * Returns: %TRUE if @trans was removed
2454  */
2455 gboolean
2456 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2457     GstRTSPStreamTransport * trans)
2458 {
2459   GstRTSPStreamPrivate *priv;
2460   gboolean res;
2461
2462   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2463   priv = stream->priv;
2464   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2465   g_return_val_if_fail (priv->is_joined, FALSE);
2466
2467   g_mutex_lock (&priv->lock);
2468   res = update_transport (stream, trans, FALSE);
2469   g_mutex_unlock (&priv->lock);
2470
2471   return res;
2472 }
2473
2474 /**
2475  * gst_rtsp_stream_update_crypto:
2476  * @stream: a #GstRTSPStream
2477  * @ssrc: the SSRC
2478  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2479  *
2480  * Update the new crypto information for @ssrc in @stream. If information
2481  * for @ssrc did not exist, it will be added. If information
2482  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2483  * be removed from @stream.
2484  *
2485  * Returns: %TRUE if @crypto could be updated
2486  */
2487 gboolean
2488 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2489     guint ssrc, GstCaps * crypto)
2490 {
2491   GstRTSPStreamPrivate *priv;
2492
2493   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2494   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2495
2496   priv = stream->priv;
2497
2498   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2499
2500   g_mutex_lock (&priv->lock);
2501   if (crypto)
2502     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2503         gst_caps_ref (crypto));
2504   else
2505     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2506   g_mutex_unlock (&priv->lock);
2507
2508   return TRUE;
2509 }
2510
2511 /**
2512  * gst_rtsp_stream_get_rtp_socket:
2513  * @stream: a #GstRTSPStream
2514  * @family: the socket family
2515  *
2516  * Get the RTP socket from @stream for a @family.
2517  *
2518  * @stream must be joined to a bin.
2519  *
2520  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2521  * socket could be allocated for @family. Unref after usage
2522  */
2523 GSocket *
2524 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2525 {
2526   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2527   GSocket *socket;
2528   const gchar *name;
2529
2530   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2531   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2532       family == G_SOCKET_FAMILY_IPV6, NULL);
2533   g_return_val_if_fail (priv->udpsink[0], NULL);
2534
2535   if (family == G_SOCKET_FAMILY_IPV6)
2536     name = "socket-v6";
2537   else
2538     name = "socket";
2539
2540   g_object_get (priv->udpsink[0], name, &socket, NULL);
2541
2542   return socket;
2543 }
2544
2545 /**
2546  * gst_rtsp_stream_get_rtcp_socket:
2547  * @stream: a #GstRTSPStream
2548  * @family: the socket family
2549  *
2550  * Get the RTCP socket from @stream for a @family.
2551  *
2552  * @stream must be joined to a bin.
2553  *
2554  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2555  * socket could be allocated for @family. Unref after usage
2556  */
2557 GSocket *
2558 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2559 {
2560   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2561   GSocket *socket;
2562   const gchar *name;
2563
2564   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2565   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2566       family == G_SOCKET_FAMILY_IPV6, NULL);
2567   g_return_val_if_fail (priv->udpsink[1], NULL);
2568
2569   if (family == G_SOCKET_FAMILY_IPV6)
2570     name = "socket-v6";
2571   else
2572     name = "socket";
2573
2574   g_object_get (priv->udpsink[1], name, &socket, NULL);
2575
2576   return socket;
2577 }
2578
2579 /**
2580  * gst_rtsp_stream_set_seqnum:
2581  * @stream: a #GstRTSPStream
2582  * @seqnum: a new sequence number
2583  *
2584  * Configure the sequence number in the payloader of @stream to @seqnum.
2585  */
2586 void
2587 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
2588 {
2589   GstRTSPStreamPrivate *priv;
2590
2591   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2592
2593   priv = stream->priv;
2594
2595   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
2596 }
2597
2598 /**
2599  * gst_rtsp_stream_get_seqnum:
2600  * @stream: a #GstRTSPStream
2601  *
2602  * Get the configured sequence number in the payloader of @stream.
2603  *
2604  * Returns: the sequence number of the payloader.
2605  */
2606 guint16
2607 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
2608 {
2609   GstRTSPStreamPrivate *priv;
2610   guint seqnum;
2611
2612   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2613
2614   priv = stream->priv;
2615
2616   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
2617
2618   return seqnum;
2619 }
2620
2621 /**
2622  * gst_rtsp_stream_transport_filter:
2623  * @stream: a #GstRTSPStream
2624  * @func: (scope call) (allow-none): a callback
2625  * @user_data: (closure): user data passed to @func
2626  *
2627  * Call @func for each transport managed by @stream. The result value of @func
2628  * determines what happens to the transport. @func will be called with @stream
2629  * locked so no further actions on @stream can be performed from @func.
2630  *
2631  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2632  * @stream.
2633  *
2634  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2635  *
2636  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2637  * will also be added with an additional ref to the result #GList of this
2638  * function..
2639  *
2640  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2641  *
2642  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2643  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2644  * element in the #GList should be unreffed before the list is freed.
2645  */
2646 GList *
2647 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2648     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2649 {
2650   GstRTSPStreamPrivate *priv;
2651   GList *result, *walk, *next;
2652   GHashTable *visited;
2653   guint cookie;
2654
2655   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2656
2657   priv = stream->priv;
2658
2659   result = NULL;
2660   if (func)
2661     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
2662
2663   g_mutex_lock (&priv->lock);
2664 restart:
2665   cookie = priv->transports_cookie;
2666   for (walk = priv->transports; walk; walk = next) {
2667     GstRTSPStreamTransport *trans = walk->data;
2668     GstRTSPFilterResult res;
2669     gboolean changed;
2670
2671     next = g_list_next (walk);
2672
2673     if (func) {
2674       /* only visit each transport once */
2675       if (g_hash_table_contains (visited, trans))
2676         continue;
2677
2678       g_hash_table_add (visited, g_object_ref (trans));
2679       g_mutex_unlock (&priv->lock);
2680
2681       res = func (stream, trans, user_data);
2682
2683       g_mutex_lock (&priv->lock);
2684     } else
2685       res = GST_RTSP_FILTER_REF;
2686
2687     changed = (cookie != priv->transports_cookie);
2688
2689     switch (res) {
2690       case GST_RTSP_FILTER_REMOVE:
2691         update_transport (stream, trans, FALSE);
2692         break;
2693       case GST_RTSP_FILTER_REF:
2694         result = g_list_prepend (result, g_object_ref (trans));
2695         break;
2696       case GST_RTSP_FILTER_KEEP:
2697       default:
2698         break;
2699     }
2700     if (changed)
2701       goto restart;
2702   }
2703   g_mutex_unlock (&priv->lock);
2704
2705   if (func)
2706     g_hash_table_unref (visited);
2707
2708   return result;
2709 }
2710
2711 static GstPadProbeReturn
2712 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2713 {
2714   GstRTSPStreamPrivate *priv;
2715   GstRTSPStream *stream;
2716
2717   stream = user_data;
2718   priv = stream->priv;
2719
2720   GST_DEBUG_OBJECT (pad, "now blocking");
2721
2722   g_mutex_lock (&priv->lock);
2723   priv->blocking = TRUE;
2724   g_mutex_unlock (&priv->lock);
2725
2726   gst_element_post_message (priv->payloader,
2727       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2728           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2729
2730   return GST_PAD_PROBE_OK;
2731 }
2732
2733 /**
2734  * gst_rtsp_stream_set_blocked:
2735  * @stream: a #GstRTSPStream
2736  * @blocked: boolean indicating we should block or unblock
2737  *
2738  * Blocks or unblocks the dataflow on @stream.
2739  *
2740  * Returns: %TRUE on success
2741  */
2742 gboolean
2743 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2744 {
2745   GstRTSPStreamPrivate *priv;
2746
2747   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2748
2749   priv = stream->priv;
2750
2751   g_mutex_lock (&priv->lock);
2752   if (blocked) {
2753     priv->blocking = FALSE;
2754     if (priv->blocked_id == 0) {
2755       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2756           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2757           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2758           g_object_ref (stream), g_object_unref);
2759     }
2760   } else {
2761     if (priv->blocked_id != 0) {
2762       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2763       priv->blocked_id = 0;
2764       priv->blocking = FALSE;
2765     }
2766   }
2767   g_mutex_unlock (&priv->lock);
2768
2769   return TRUE;
2770 }
2771
2772 /**
2773  * gst_rtsp_stream_is_blocking:
2774  * @stream: a #GstRTSPStream
2775  *
2776  * Check if @stream is blocking on a #GstBuffer.
2777  *
2778  * Returns: %TRUE if @stream is blocking
2779  */
2780 gboolean
2781 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2782 {
2783   GstRTSPStreamPrivate *priv;
2784   gboolean result;
2785
2786   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2787
2788   priv = stream->priv;
2789
2790   g_mutex_lock (&priv->lock);
2791   result = priv->blocking;
2792   g_mutex_unlock (&priv->lock);
2793
2794   return result;
2795 }
2796
2797 /**
2798  * gst_rtsp_stream_query_position:
2799  * @stream: a #GstRTSPStream
2800  *
2801  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
2802  * the RTP parts of the pipeline and not the RTCP parts.
2803  *
2804  * Returns: %TRUE if the position could be queried
2805  */
2806 gboolean
2807 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
2808 {
2809   GstRTSPStreamPrivate *priv;
2810   GstElement *sink;
2811   gboolean ret;
2812
2813   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2814
2815   priv = stream->priv;
2816
2817   g_mutex_lock (&priv->lock);
2818   if ((sink = priv->udpsink[0]))
2819     gst_object_ref (sink);
2820   g_mutex_unlock (&priv->lock);
2821
2822   if (!sink)
2823     return FALSE;
2824
2825   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
2826   gst_object_unref (sink);
2827
2828   return ret;
2829 }
2830
2831 /**
2832  * gst_rtsp_stream_query_stop:
2833  * @stream: a #GstRTSPStream
2834  *
2835  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
2836  * the RTP parts of the pipeline and not the RTCP parts.
2837  *
2838  * Returns: %TRUE if the stop could be queried
2839  */
2840 gboolean
2841 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
2842 {
2843   GstRTSPStreamPrivate *priv;
2844   GstElement *sink;
2845   GstQuery *query;
2846   gboolean ret;
2847
2848   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2849
2850   priv = stream->priv;
2851
2852   g_mutex_lock (&priv->lock);
2853   if ((sink = priv->udpsink[0]))
2854     gst_object_ref (sink);
2855   g_mutex_unlock (&priv->lock);
2856
2857   if (!sink)
2858     return FALSE;
2859
2860   query = gst_query_new_segment (GST_FORMAT_TIME);
2861   if ((ret = gst_element_query (sink, query))) {
2862     GstFormat format;
2863
2864     gst_query_parse_segment (query, NULL, &format, NULL, stop);
2865     if (format != GST_FORMAT_TIME)
2866       *stop = -1;
2867   }
2868   gst_query_unref (query);
2869   gst_object_unref (sink);
2870
2871   return ret;
2872
2873 }