rtsp-stream: Listen on the multicast group for RTP/RTCP packets
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 typedef struct
62 {
63   GstRTSPStreamTransport *transport;
64
65   /* RTP and RTCP source */
66   GstElement *udpsrc[2];
67   GstPad *selpad[2];
68 } GstRTSPMulticastTransportSource;
69
70 struct _GstRTSPStreamPrivate
71 {
72   GMutex lock;
73   guint idx;
74   GstPad *srcpad;
75   GstElement *payloader;
76   guint buffer_size;
77   gboolean is_joined;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_sink[2];
86   GstPad *send_src[2];
87
88   /* the RTPSession object */
89   GObject *session;
90
91   /* SRTP encoder/decoder */
92   GstElement *srtpenc;
93   GstElement *srtpdec;
94   GHashTable *keys;
95
96   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
97    * sockets */
98   GstElement *udpsrc_v4[2];
99
100   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
101    * sockets */
102   GstElement *udpsrc_v6[2];
103
104   GstElement *udpsink[2];
105
106   /* for TCP transport */
107   GstElement *appsrc[2];
108   GstElement *appqueue[2];
109   GstElement *appsink[2];
110
111   GstElement *tee[2];
112   GstElement *funnel[2];
113
114   /* server ports for sending/receiving over ipv4 */
115   GstRTSPRange server_port_v4;
116   GstRTSPAddress *server_addr_v4;
117   gboolean have_ipv4;
118
119   /* server ports for sending/receiving over ipv6 */
120   GstRTSPRange server_port_v6;
121   GstRTSPAddress *server_addr_v6;
122   gboolean have_ipv6;
123
124   /* multicast addresses */
125   GstRTSPAddressPool *pool;
126   GstRTSPAddress *addr_v4;
127   GstRTSPAddress *addr_v6;
128
129   /* the caps of the stream */
130   gulong caps_sig;
131   GstCaps *caps;
132
133   /* transports we stream to */
134   guint n_active;
135   GList *transports;
136   guint transports_cookie;
137   GList *tr_cache;
138   guint tr_cache_cookie;
139
140   /* UDP sources for UDP multicast transports */
141   GList *transport_sources;
142
143   gint dscp_qos;
144
145   /* stream blocking */
146   gulong blocked_id;
147   gboolean blocking;
148 };
149
150 #define DEFAULT_CONTROL         NULL
151 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
152 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
153                                         GST_RTSP_LOWER_TRANS_TCP
154
155 enum
156 {
157   PROP_0,
158   PROP_CONTROL,
159   PROP_PROFILES,
160   PROP_PROTOCOLS,
161   PROP_LAST
162 };
163
164 enum
165 {
166   SIGNAL_NEW_RTP_ENCODER,
167   SIGNAL_NEW_RTCP_ENCODER,
168   SIGNAL_LAST
169 };
170
171 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
172 #define GST_CAT_DEFAULT rtsp_stream_debug
173
174 static GQuark ssrc_stream_map_key;
175
176 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
177     GValue * value, GParamSpec * pspec);
178 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
179     const GValue * value, GParamSpec * pspec);
180
181 static void gst_rtsp_stream_finalize (GObject * obj);
182
183 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
184
185 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
186
187 static void
188 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
189 {
190   GObjectClass *gobject_class;
191
192   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
193
194   gobject_class = G_OBJECT_CLASS (klass);
195
196   gobject_class->get_property = gst_rtsp_stream_get_property;
197   gobject_class->set_property = gst_rtsp_stream_set_property;
198   gobject_class->finalize = gst_rtsp_stream_finalize;
199
200   g_object_class_install_property (gobject_class, PROP_CONTROL,
201       g_param_spec_string ("control", "Control",
202           "The control string for this stream", DEFAULT_CONTROL,
203           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
204
205   g_object_class_install_property (gobject_class, PROP_PROFILES,
206       g_param_spec_flags ("profiles", "Profiles",
207           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
208           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209
210   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
211       g_param_spec_flags ("protocols", "Protocols",
212           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
213           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214
215   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
216       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
217       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
218       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
219
220   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
221       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
222       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
223       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
224
225   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
226
227   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
228 }
229
230 static void
231 gst_rtsp_stream_init (GstRTSPStream * stream)
232 {
233   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
234
235   GST_DEBUG ("new stream %p", stream);
236
237   stream->priv = priv;
238
239   priv->dscp_qos = -1;
240   priv->control = g_strdup (DEFAULT_CONTROL);
241   priv->profiles = DEFAULT_PROFILES;
242   priv->protocols = DEFAULT_PROTOCOLS;
243
244   g_mutex_init (&priv->lock);
245
246   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
247       NULL, (GDestroyNotify) gst_caps_unref);
248 }
249
250 static void
251 gst_rtsp_stream_finalize (GObject * obj)
252 {
253   GstRTSPStream *stream;
254   GstRTSPStreamPrivate *priv;
255
256   stream = GST_RTSP_STREAM (obj);
257   priv = stream->priv;
258
259   GST_DEBUG ("finalize stream %p", stream);
260
261   /* we really need to be unjoined now */
262   g_return_if_fail (!priv->is_joined);
263
264   if (priv->addr_v4)
265     gst_rtsp_address_free (priv->addr_v4);
266   if (priv->addr_v6)
267     gst_rtsp_address_free (priv->addr_v6);
268   if (priv->server_addr_v4)
269     gst_rtsp_address_free (priv->server_addr_v4);
270   if (priv->server_addr_v6)
271     gst_rtsp_address_free (priv->server_addr_v6);
272   if (priv->pool)
273     g_object_unref (priv->pool);
274   gst_object_unref (priv->payloader);
275   gst_object_unref (priv->srcpad);
276   g_free (priv->control);
277   g_mutex_clear (&priv->lock);
278
279   g_hash_table_unref (priv->keys);
280
281   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
282 }
283
284 static void
285 gst_rtsp_stream_get_property (GObject * object, guint propid,
286     GValue * value, GParamSpec * pspec)
287 {
288   GstRTSPStream *stream = GST_RTSP_STREAM (object);
289
290   switch (propid) {
291     case PROP_CONTROL:
292       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
293       break;
294     case PROP_PROFILES:
295       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
296       break;
297     case PROP_PROTOCOLS:
298       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
299       break;
300     default:
301       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
302   }
303 }
304
305 static void
306 gst_rtsp_stream_set_property (GObject * object, guint propid,
307     const GValue * value, GParamSpec * pspec)
308 {
309   GstRTSPStream *stream = GST_RTSP_STREAM (object);
310
311   switch (propid) {
312     case PROP_CONTROL:
313       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
314       break;
315     case PROP_PROFILES:
316       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
317       break;
318     case PROP_PROTOCOLS:
319       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
320       break;
321     default:
322       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
323   }
324 }
325
326 /**
327  * gst_rtsp_stream_new:
328  * @idx: an index
329  * @srcpad: a #GstPad
330  * @payloader: a #GstElement
331  *
332  * Create a new media stream with index @idx that handles RTP data on
333  * @srcpad and has a payloader element @payloader.
334  *
335  * Returns: (transfer full): a new #GstRTSPStream
336  */
337 GstRTSPStream *
338 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
339 {
340   GstRTSPStreamPrivate *priv;
341   GstRTSPStream *stream;
342
343   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
344   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
345   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
346
347   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
348   priv = stream->priv;
349   priv->idx = idx;
350   priv->payloader = gst_object_ref (payloader);
351   priv->srcpad = gst_object_ref (srcpad);
352
353   return stream;
354 }
355
356 /**
357  * gst_rtsp_stream_get_index:
358  * @stream: a #GstRTSPStream
359  *
360  * Get the stream index.
361  *
362  * Return: the stream index.
363  */
364 guint
365 gst_rtsp_stream_get_index (GstRTSPStream * stream)
366 {
367   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
368
369   return stream->priv->idx;
370 }
371
372 /**
373  * gst_rtsp_stream_get_pt:
374  * @stream: a #GstRTSPStream
375  *
376  * Get the stream payload type.
377  *
378  * Return: the stream payload type.
379  */
380 guint
381 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
382 {
383   GstRTSPStreamPrivate *priv;
384   guint pt;
385
386   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
387
388   priv = stream->priv;
389
390   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
391
392   return pt;
393 }
394
395 /**
396  * gst_rtsp_stream_get_srcpad:
397  * @stream: a #GstRTSPStream
398  *
399  * Get the srcpad associated with @stream.
400  *
401  * Returns: (transfer full): the srcpad. Unref after usage.
402  */
403 GstPad *
404 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
405 {
406   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
407
408   return gst_object_ref (stream->priv->srcpad);
409 }
410
411 /**
412  * gst_rtsp_stream_get_control:
413  * @stream: a #GstRTSPStream
414  *
415  * Get the control string to identify this stream.
416  *
417  * Returns: (transfer full): the control string. g_free() after usage.
418  */
419 gchar *
420 gst_rtsp_stream_get_control (GstRTSPStream * stream)
421 {
422   GstRTSPStreamPrivate *priv;
423   gchar *result;
424
425   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
426
427   priv = stream->priv;
428
429   g_mutex_lock (&priv->lock);
430   if ((result = g_strdup (priv->control)) == NULL)
431     result = g_strdup_printf ("stream=%u", priv->idx);
432   g_mutex_unlock (&priv->lock);
433
434   return result;
435 }
436
437 /**
438  * gst_rtsp_stream_set_control:
439  * @stream: a #GstRTSPStream
440  * @control: a control string
441  *
442  * Set the control string in @stream.
443  */
444 void
445 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
446 {
447   GstRTSPStreamPrivate *priv;
448
449   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
450
451   priv = stream->priv;
452
453   g_mutex_lock (&priv->lock);
454   g_free (priv->control);
455   priv->control = g_strdup (control);
456   g_mutex_unlock (&priv->lock);
457 }
458
459 /**
460  * gst_rtsp_stream_has_control:
461  * @stream: a #GstRTSPStream
462  * @control: a control string
463  *
464  * Check if @stream has the control string @control.
465  *
466  * Returns: %TRUE is @stream has @control as the control string
467  */
468 gboolean
469 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
470 {
471   GstRTSPStreamPrivate *priv;
472   gboolean res;
473
474   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
475
476   priv = stream->priv;
477
478   g_mutex_lock (&priv->lock);
479   if (priv->control)
480     res = (g_strcmp0 (priv->control, control) == 0);
481   else {
482     guint streamid;
483
484     if (sscanf (control, "stream=%u", &streamid) > 0)
485       res = (streamid == priv->idx);
486     else
487       res = FALSE;
488   }
489   g_mutex_unlock (&priv->lock);
490
491   return res;
492 }
493
494 /**
495  * gst_rtsp_stream_set_mtu:
496  * @stream: a #GstRTSPStream
497  * @mtu: a new MTU
498  *
499  * Configure the mtu in the payloader of @stream to @mtu.
500  */
501 void
502 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
503 {
504   GstRTSPStreamPrivate *priv;
505
506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
507
508   priv = stream->priv;
509
510   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
511
512   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
513 }
514
515 /**
516  * gst_rtsp_stream_get_mtu:
517  * @stream: a #GstRTSPStream
518  *
519  * Get the configured MTU in the payloader of @stream.
520  *
521  * Returns: the MTU of the payloader.
522  */
523 guint
524 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
525 {
526   GstRTSPStreamPrivate *priv;
527   guint mtu;
528
529   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
530
531   priv = stream->priv;
532
533   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
534
535   return mtu;
536 }
537
538 /* Update the dscp qos property on the udp sinks */
539 static void
540 update_dscp_qos (GstRTSPStream * stream)
541 {
542   GstRTSPStreamPrivate *priv;
543
544   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
545
546   priv = stream->priv;
547
548   if (priv->udpsink[0]) {
549     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
550         NULL);
551   }
552
553   if (priv->udpsink[1]) {
554     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
555         NULL);
556   }
557 }
558
559 /**
560  * gst_rtsp_stream_set_dscp_qos:
561  * @stream: a #GstRTSPStream
562  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
563  *
564  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
565  */
566 void
567 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
568 {
569   GstRTSPStreamPrivate *priv;
570
571   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
572
573   priv = stream->priv;
574
575   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
576
577   if (dscp_qos < -1 || dscp_qos > 63) {
578     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
579     return;
580   }
581
582   priv->dscp_qos = dscp_qos;
583
584   update_dscp_qos (stream);
585 }
586
587 /**
588  * gst_rtsp_stream_get_dscp_qos:
589  * @stream: a #GstRTSPStream
590  *
591  * Get the configured DSCP QoS in of the outgoing sockets.
592  *
593  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
594  */
595 gint
596 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
597 {
598   GstRTSPStreamPrivate *priv;
599
600   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
601
602   priv = stream->priv;
603
604   return priv->dscp_qos;
605 }
606
607 /**
608  * gst_rtsp_stream_is_transport_supported:
609  * @stream: a #GstRTSPStream
610  * @transport: (transfer none): a #GstRTSPTransport
611  *
612  * Check if @transport can be handled by stream
613  *
614  * Returns: %TRUE if @transport can be handled by @stream.
615  */
616 gboolean
617 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
618     GstRTSPTransport * transport)
619 {
620   GstRTSPStreamPrivate *priv;
621
622   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
623
624   priv = stream->priv;
625
626   g_mutex_lock (&priv->lock);
627   if (transport->trans != GST_RTSP_TRANS_RTP)
628     goto unsupported_transmode;
629
630   if (!(transport->profile & priv->profiles))
631     goto unsupported_profile;
632
633   if (!(transport->lower_transport & priv->protocols))
634     goto unsupported_ltrans;
635
636   g_mutex_unlock (&priv->lock);
637
638   return TRUE;
639
640   /* ERRORS */
641 unsupported_transmode:
642   {
643     GST_DEBUG ("unsupported transport mode %d", transport->trans);
644     g_mutex_unlock (&priv->lock);
645     return FALSE;
646   }
647 unsupported_profile:
648   {
649     GST_DEBUG ("unsupported profile %d", transport->profile);
650     g_mutex_unlock (&priv->lock);
651     return FALSE;
652   }
653 unsupported_ltrans:
654   {
655     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
656     g_mutex_unlock (&priv->lock);
657     return FALSE;
658   }
659 }
660
661 /**
662  * gst_rtsp_stream_set_profiles:
663  * @stream: a #GstRTSPStream
664  * @profiles: the new profiles
665  *
666  * Configure the allowed profiles for @stream.
667  */
668 void
669 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
670 {
671   GstRTSPStreamPrivate *priv;
672
673   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
674
675   priv = stream->priv;
676
677   g_mutex_lock (&priv->lock);
678   priv->profiles = profiles;
679   g_mutex_unlock (&priv->lock);
680 }
681
682 /**
683  * gst_rtsp_stream_get_profiles:
684  * @stream: a #GstRTSPStream
685  *
686  * Get the allowed profiles of @stream.
687  *
688  * Returns: a #GstRTSPProfile
689  */
690 GstRTSPProfile
691 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
692 {
693   GstRTSPStreamPrivate *priv;
694   GstRTSPProfile res;
695
696   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
697
698   priv = stream->priv;
699
700   g_mutex_lock (&priv->lock);
701   res = priv->profiles;
702   g_mutex_unlock (&priv->lock);
703
704   return res;
705 }
706
707 /**
708  * gst_rtsp_stream_set_protocols:
709  * @stream: a #GstRTSPStream
710  * @protocols: the new flags
711  *
712  * Configure the allowed lower transport for @stream.
713  */
714 void
715 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
716     GstRTSPLowerTrans protocols)
717 {
718   GstRTSPStreamPrivate *priv;
719
720   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
721
722   priv = stream->priv;
723
724   g_mutex_lock (&priv->lock);
725   priv->protocols = protocols;
726   g_mutex_unlock (&priv->lock);
727 }
728
729 /**
730  * gst_rtsp_stream_get_protocols:
731  * @stream: a #GstRTSPStream
732  *
733  * Get the allowed protocols of @stream.
734  *
735  * Returns: a #GstRTSPLowerTrans
736  */
737 GstRTSPLowerTrans
738 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
739 {
740   GstRTSPStreamPrivate *priv;
741   GstRTSPLowerTrans res;
742
743   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
744       GST_RTSP_LOWER_TRANS_UNKNOWN);
745
746   priv = stream->priv;
747
748   g_mutex_lock (&priv->lock);
749   res = priv->protocols;
750   g_mutex_unlock (&priv->lock);
751
752   return res;
753 }
754
755 /**
756  * gst_rtsp_stream_set_address_pool:
757  * @stream: a #GstRTSPStream
758  * @pool: (transfer none): a #GstRTSPAddressPool
759  *
760  * configure @pool to be used as the address pool of @stream.
761  */
762 void
763 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
764     GstRTSPAddressPool * pool)
765 {
766   GstRTSPStreamPrivate *priv;
767   GstRTSPAddressPool *old;
768
769   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
770
771   priv = stream->priv;
772
773   GST_LOG_OBJECT (stream, "set address pool %p", pool);
774
775   g_mutex_lock (&priv->lock);
776   if ((old = priv->pool) != pool)
777     priv->pool = pool ? g_object_ref (pool) : NULL;
778   else
779     old = NULL;
780   g_mutex_unlock (&priv->lock);
781
782   if (old)
783     g_object_unref (old);
784 }
785
786 /**
787  * gst_rtsp_stream_get_address_pool:
788  * @stream: a #GstRTSPStream
789  *
790  * Get the #GstRTSPAddressPool used as the address pool of @stream.
791  *
792  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
793  * usage.
794  */
795 GstRTSPAddressPool *
796 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
797 {
798   GstRTSPStreamPrivate *priv;
799   GstRTSPAddressPool *result;
800
801   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
802
803   priv = stream->priv;
804
805   g_mutex_lock (&priv->lock);
806   if ((result = priv->pool))
807     g_object_ref (result);
808   g_mutex_unlock (&priv->lock);
809
810   return result;
811 }
812
813 /**
814  * gst_rtsp_stream_get_multicast_address:
815  * @stream: a #GstRTSPStream
816  * @family: the #GSocketFamily
817  *
818  * Get the multicast address of @stream for @family.
819  *
820  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
821  * or %NULL when no address could be allocated. gst_rtsp_address_free()
822  * after usage.
823  */
824 GstRTSPAddress *
825 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
826     GSocketFamily family)
827 {
828   GstRTSPStreamPrivate *priv;
829   GstRTSPAddress *result;
830   GstRTSPAddress **addrp;
831   GstRTSPAddressFlags flags;
832
833   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
834
835   priv = stream->priv;
836
837   if (family == G_SOCKET_FAMILY_IPV6) {
838     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
839     addrp = &priv->addr_v6;
840   } else {
841     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
842     addrp = &priv->addr_v4;
843   }
844
845   g_mutex_lock (&priv->lock);
846   if (*addrp == NULL) {
847     if (priv->pool == NULL)
848       goto no_pool;
849
850     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
851
852     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
853     if (*addrp == NULL)
854       goto no_address;
855   }
856   result = gst_rtsp_address_copy (*addrp);
857   g_mutex_unlock (&priv->lock);
858
859   return result;
860
861   /* ERRORS */
862 no_pool:
863   {
864     GST_ERROR_OBJECT (stream, "no address pool specified");
865     g_mutex_unlock (&priv->lock);
866     return NULL;
867   }
868 no_address:
869   {
870     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
871     g_mutex_unlock (&priv->lock);
872     return NULL;
873   }
874 }
875
876 /**
877  * gst_rtsp_stream_reserve_address:
878  * @stream: a #GstRTSPStream
879  * @address: an address
880  * @port: a port
881  * @n_ports: n_ports
882  * @ttl: a TTL
883  *
884  * Reserve @address and @port as the address and port of @stream.
885  *
886  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
887  * the address could be reserved. gst_rtsp_address_free() after usage.
888  */
889 GstRTSPAddress *
890 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
891     const gchar * address, guint port, guint n_ports, guint ttl)
892 {
893   GstRTSPStreamPrivate *priv;
894   GstRTSPAddress *result;
895   GInetAddress *addr;
896   GSocketFamily family;
897   GstRTSPAddress **addrp;
898
899   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
900   g_return_val_if_fail (address != NULL, NULL);
901   g_return_val_if_fail (port > 0, NULL);
902   g_return_val_if_fail (n_ports > 0, NULL);
903   g_return_val_if_fail (ttl > 0, NULL);
904
905   priv = stream->priv;
906
907   addr = g_inet_address_new_from_string (address);
908   if (!addr) {
909     GST_ERROR ("failed to get inet addr from %s", address);
910     family = G_SOCKET_FAMILY_IPV4;
911   } else {
912     family = g_inet_address_get_family (addr);
913     g_object_unref (addr);
914   }
915
916   if (family == G_SOCKET_FAMILY_IPV6)
917     addrp = &priv->addr_v6;
918   else
919     addrp = &priv->addr_v4;
920
921   g_mutex_lock (&priv->lock);
922   if (*addrp == NULL) {
923     GstRTSPAddressPoolResult res;
924
925     if (priv->pool == NULL)
926       goto no_pool;
927
928     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
929         port, n_ports, ttl, addrp);
930     if (res != GST_RTSP_ADDRESS_POOL_OK)
931       goto no_address;
932   } else {
933     if (strcmp ((*addrp)->address, address) ||
934         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
935         (*addrp)->ttl != ttl)
936       goto different_address;
937   }
938   result = gst_rtsp_address_copy (*addrp);
939   g_mutex_unlock (&priv->lock);
940
941   return result;
942
943   /* ERRORS */
944 no_pool:
945   {
946     GST_ERROR_OBJECT (stream, "no address pool specified");
947     g_mutex_unlock (&priv->lock);
948     return NULL;
949   }
950 no_address:
951   {
952     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
953         address);
954     g_mutex_unlock (&priv->lock);
955     return NULL;
956   }
957 different_address:
958   {
959     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
960         " reserved", address);
961     g_mutex_unlock (&priv->lock);
962     return NULL;
963   }
964 }
965
966 static gboolean
967 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
968     GSocketFamily family, GstElement * udpsrc_out[2],
969     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
970     GstRTSPAddress ** server_addr_out)
971 {
972   GstStateChangeReturn ret;
973   GstElement *udpsrc0, *udpsrc1;
974   GstElement *udpsink0, *udpsink1;
975   GSocket *rtp_socket = NULL;
976   GSocket *rtcp_socket;
977   gint tmp_rtp, tmp_rtcp;
978   guint count;
979   gint rtpport, rtcpport;
980   GList *rejected_addresses = NULL;
981   GstRTSPAddress *addr = NULL;
982   GInetAddress *inetaddr = NULL;
983   GSocketAddress *rtp_sockaddr = NULL;
984   GSocketAddress *rtcp_sockaddr = NULL;
985   const gchar *multisink_socket;
986
987   if (family == G_SOCKET_FAMILY_IPV6)
988     multisink_socket = "socket-v6";
989   else
990     multisink_socket = "socket";
991
992   udpsrc0 = NULL;
993   udpsrc1 = NULL;
994   udpsink0 = NULL;
995   udpsink1 = NULL;
996   count = 0;
997
998   /* Start with random port */
999   tmp_rtp = 0;
1000
1001   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1002       G_SOCKET_PROTOCOL_UDP, NULL);
1003   if (!rtcp_socket)
1004     goto no_udp_protocol;
1005
1006   if (*server_addr_out)
1007     gst_rtsp_address_free (*server_addr_out);
1008
1009   /* try to allocate 2 UDP ports, the RTP port should be an even
1010    * number and the RTCP port should be the next (uneven) port */
1011 again:
1012
1013   if (rtp_socket == NULL) {
1014     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1015         G_SOCKET_PROTOCOL_UDP, NULL);
1016     if (!rtp_socket)
1017       goto no_udp_protocol;
1018   }
1019
1020   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1021     GstRTSPAddressFlags flags;
1022
1023     if (addr)
1024       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1025
1026     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1027     if (family == G_SOCKET_FAMILY_IPV6)
1028       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1029     else
1030       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1031
1032     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1033
1034     if (addr == NULL)
1035       goto no_ports;
1036
1037     tmp_rtp = addr->port;
1038
1039     g_clear_object (&inetaddr);
1040     inetaddr = g_inet_address_new_from_string (addr->address);
1041   } else {
1042     if (tmp_rtp != 0) {
1043       tmp_rtp += 2;
1044       if (++count > 20)
1045         goto no_ports;
1046     }
1047
1048     if (inetaddr == NULL)
1049       inetaddr = g_inet_address_new_any (family);
1050   }
1051
1052   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1053   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1054     g_object_unref (rtp_sockaddr);
1055     goto again;
1056   }
1057   g_object_unref (rtp_sockaddr);
1058
1059   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1060   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1061     g_clear_object (&rtp_sockaddr);
1062     goto socket_error;
1063   }
1064
1065   tmp_rtp =
1066       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1067   g_object_unref (rtp_sockaddr);
1068
1069   /* check if port is even */
1070   if ((tmp_rtp & 1) != 0) {
1071     /* port not even, close and allocate another */
1072     tmp_rtp++;
1073     g_clear_object (&rtp_socket);
1074     goto again;
1075   }
1076
1077   /* set port */
1078   tmp_rtcp = tmp_rtp + 1;
1079
1080   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1081   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1082     g_object_unref (rtcp_sockaddr);
1083     g_clear_object (&rtp_socket);
1084     goto again;
1085   }
1086   g_object_unref (rtcp_sockaddr);
1087
1088   g_clear_object (&inetaddr);
1089
1090   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1091   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1092
1093   if (udpsrc0 == NULL || udpsrc1 == NULL)
1094     goto no_udp_protocol;
1095
1096   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1097   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1098
1099   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1100   if (ret == GST_STATE_CHANGE_FAILURE)
1101     goto element_error;
1102   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1103   if (ret == GST_STATE_CHANGE_FAILURE)
1104     goto element_error;
1105
1106   /* all fine, do port check */
1107   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1108   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1109
1110   /* this should not happen... */
1111   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1112     goto port_error;
1113
1114   if (udpsink_out[0])
1115     udpsink0 = udpsink_out[0];
1116   else
1117     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1118
1119   if (!udpsink0)
1120     goto no_udp_protocol;
1121
1122   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1123   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1124
1125   if (udpsink_out[1])
1126     udpsink1 = udpsink_out[1];
1127   else
1128     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1129
1130   if (!udpsink1)
1131     goto no_udp_protocol;
1132
1133   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1134   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1135   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1136
1137   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1138   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1139   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1140   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1141   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1142   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1143   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1144   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1145
1146   /* we keep these elements, we will further configure them when the
1147    * client told us to really use the UDP ports. */
1148   udpsrc_out[0] = udpsrc0;
1149   udpsrc_out[1] = udpsrc1;
1150   udpsink_out[0] = udpsink0;
1151   udpsink_out[1] = udpsink1;
1152
1153   server_port_out->min = rtpport;
1154   server_port_out->max = rtcpport;
1155
1156   *server_addr_out = addr;
1157   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1158
1159   g_object_unref (rtp_socket);
1160   g_object_unref (rtcp_socket);
1161
1162   return TRUE;
1163
1164   /* ERRORS */
1165 no_udp_protocol:
1166   {
1167     goto cleanup;
1168   }
1169 no_ports:
1170   {
1171     goto cleanup;
1172   }
1173 port_error:
1174   {
1175     goto cleanup;
1176   }
1177 socket_error:
1178   {
1179     goto cleanup;
1180   }
1181 element_error:
1182   {
1183     goto cleanup;
1184   }
1185 cleanup:
1186   {
1187     if (udpsrc0) {
1188       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1189       gst_object_unref (udpsrc0);
1190     }
1191     if (udpsrc1) {
1192       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1193       gst_object_unref (udpsrc1);
1194     }
1195     if (udpsink0) {
1196       gst_element_set_state (udpsink0, GST_STATE_NULL);
1197       gst_object_unref (udpsink0);
1198     }
1199     if (inetaddr)
1200       g_object_unref (inetaddr);
1201     g_list_free_full (rejected_addresses,
1202         (GDestroyNotify) gst_rtsp_address_free);
1203     if (addr)
1204       gst_rtsp_address_free (addr);
1205     if (rtp_socket)
1206       g_object_unref (rtp_socket);
1207     if (rtcp_socket)
1208       g_object_unref (rtcp_socket);
1209     return FALSE;
1210   }
1211 }
1212
1213 /* must be called with lock */
1214 static gboolean
1215 alloc_ports (GstRTSPStream * stream)
1216 {
1217   GstRTSPStreamPrivate *priv = stream->priv;
1218
1219   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1220       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1221       &priv->server_port_v4, &priv->server_addr_v4);
1222
1223   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1224       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1225       &priv->server_port_v6, &priv->server_addr_v6);
1226
1227   return priv->have_ipv4 || priv->have_ipv6;
1228 }
1229
1230 /**
1231  * gst_rtsp_stream_get_server_port:
1232  * @stream: a #GstRTSPStream
1233  * @server_port: (out): result server port
1234  * @family: the port family to get
1235  *
1236  * Fill @server_port with the port pair used by the server. This function can
1237  * only be called when @stream has been joined.
1238  */
1239 void
1240 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1241     GstRTSPRange * server_port, GSocketFamily family)
1242 {
1243   GstRTSPStreamPrivate *priv;
1244
1245   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1246   priv = stream->priv;
1247   g_return_if_fail (priv->is_joined);
1248
1249   g_mutex_lock (&priv->lock);
1250   if (family == G_SOCKET_FAMILY_IPV4) {
1251     if (server_port)
1252       *server_port = priv->server_port_v4;
1253   } else {
1254     if (server_port)
1255       *server_port = priv->server_port_v6;
1256   }
1257   g_mutex_unlock (&priv->lock);
1258 }
1259
1260 /**
1261  * gst_rtsp_stream_get_rtpsession:
1262  * @stream: a #GstRTSPStream
1263  *
1264  * Get the RTP session of this stream.
1265  *
1266  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1267  */
1268 GObject *
1269 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1270 {
1271   GstRTSPStreamPrivate *priv;
1272   GObject *session;
1273
1274   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1275
1276   priv = stream->priv;
1277
1278   g_mutex_lock (&priv->lock);
1279   if ((session = priv->session))
1280     g_object_ref (session);
1281   g_mutex_unlock (&priv->lock);
1282
1283   return session;
1284 }
1285
1286 /**
1287  * gst_rtsp_stream_get_ssrc:
1288  * @stream: a #GstRTSPStream
1289  * @ssrc: (out): result ssrc
1290  *
1291  * Get the SSRC used by the RTP session of this stream. This function can only
1292  * be called when @stream has been joined.
1293  */
1294 void
1295 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1296 {
1297   GstRTSPStreamPrivate *priv;
1298
1299   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1300   priv = stream->priv;
1301   g_return_if_fail (priv->is_joined);
1302
1303   g_mutex_lock (&priv->lock);
1304   if (ssrc && priv->session)
1305     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1306   g_mutex_unlock (&priv->lock);
1307 }
1308
1309 /* executed from streaming thread */
1310 static void
1311 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1312 {
1313   GstRTSPStreamPrivate *priv = stream->priv;
1314   GstCaps *newcaps, *oldcaps;
1315
1316   newcaps = gst_pad_get_current_caps (pad);
1317
1318   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1319       newcaps);
1320
1321   g_mutex_lock (&priv->lock);
1322   oldcaps = priv->caps;
1323   priv->caps = newcaps;
1324   g_mutex_unlock (&priv->lock);
1325
1326   if (oldcaps)
1327     gst_caps_unref (oldcaps);
1328 }
1329
1330 static void
1331 dump_structure (const GstStructure * s)
1332 {
1333   gchar *sstr;
1334
1335   sstr = gst_structure_to_string (s);
1336   GST_INFO ("structure: %s", sstr);
1337   g_free (sstr);
1338 }
1339
1340 static GstRTSPStreamTransport *
1341 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1342 {
1343   GstRTSPStreamPrivate *priv = stream->priv;
1344   GList *walk;
1345   GstRTSPStreamTransport *result = NULL;
1346   const gchar *tmp;
1347   gchar *dest;
1348   guint port;
1349
1350   if (rtcp_from == NULL)
1351     return NULL;
1352
1353   tmp = g_strrstr (rtcp_from, ":");
1354   if (tmp == NULL)
1355     return NULL;
1356
1357   port = atoi (tmp + 1);
1358   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1359
1360   g_mutex_lock (&priv->lock);
1361   GST_INFO ("finding %s:%d in %d transports", dest, port,
1362       g_list_length (priv->transports));
1363
1364   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1365     GstRTSPStreamTransport *trans = walk->data;
1366     const GstRTSPTransport *tr;
1367     gint min, max;
1368
1369     tr = gst_rtsp_stream_transport_get_transport (trans);
1370
1371     min = tr->client_port.min;
1372     max = tr->client_port.max;
1373
1374     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1375       result = trans;
1376       break;
1377     }
1378   }
1379   if (result)
1380     g_object_ref (result);
1381   g_mutex_unlock (&priv->lock);
1382
1383   g_free (dest);
1384
1385   return result;
1386 }
1387
1388 static GstRTSPStreamTransport *
1389 check_transport (GObject * source, GstRTSPStream * stream)
1390 {
1391   GstStructure *stats;
1392   GstRTSPStreamTransport *trans;
1393
1394   /* see if we have a stream to match with the origin of the RTCP packet */
1395   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1396   if (trans == NULL) {
1397     g_object_get (source, "stats", &stats, NULL);
1398     if (stats) {
1399       const gchar *rtcp_from;
1400
1401       dump_structure (stats);
1402
1403       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1404       if ((trans = find_transport (stream, rtcp_from))) {
1405         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1406             source);
1407         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1408             g_object_unref);
1409       }
1410       gst_structure_free (stats);
1411     }
1412   }
1413   return trans;
1414 }
1415
1416
1417 static void
1418 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1419 {
1420   GstRTSPStreamTransport *trans;
1421
1422   GST_INFO ("%p: new source %p", stream, source);
1423
1424   trans = check_transport (source, stream);
1425
1426   if (trans)
1427     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1428 }
1429
1430 static void
1431 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1432 {
1433   GST_INFO ("%p: new SDES %p", stream, source);
1434 }
1435
1436 static void
1437 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1438 {
1439   GstRTSPStreamTransport *trans;
1440
1441   trans = check_transport (source, stream);
1442
1443   if (trans) {
1444     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1445     gst_rtsp_stream_transport_keep_alive (trans);
1446   }
1447 #ifdef DUMP_STATS
1448   {
1449     GstStructure *stats;
1450     g_object_get (source, "stats", &stats, NULL);
1451     if (stats) {
1452       dump_structure (stats);
1453       gst_structure_free (stats);
1454     }
1455   }
1456 #endif
1457 }
1458
1459 static void
1460 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1461 {
1462   GST_INFO ("%p: source %p bye", stream, source);
1463 }
1464
1465 static void
1466 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1467 {
1468   GstRTSPStreamTransport *trans;
1469
1470   GST_INFO ("%p: source %p bye timeout", stream, source);
1471
1472   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1473     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1474     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1475   }
1476 }
1477
1478 static void
1479 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1480 {
1481   GstRTSPStreamTransport *trans;
1482
1483   GST_INFO ("%p: source %p timeout", stream, source);
1484
1485   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1486     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1487     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1488   }
1489 }
1490
1491 static void
1492 clear_tr_cache (GstRTSPStreamPrivate * priv)
1493 {
1494   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1495   g_list_free (priv->tr_cache);
1496   priv->tr_cache = NULL;
1497 }
1498
1499 static GstFlowReturn
1500 handle_new_sample (GstAppSink * sink, gpointer user_data)
1501 {
1502   GstRTSPStreamPrivate *priv;
1503   GList *walk;
1504   GstSample *sample;
1505   GstBuffer *buffer;
1506   GstRTSPStream *stream;
1507   gboolean is_rtp;
1508
1509   sample = gst_app_sink_pull_sample (sink);
1510   if (!sample)
1511     return GST_FLOW_OK;
1512
1513   stream = (GstRTSPStream *) user_data;
1514   priv = stream->priv;
1515   buffer = gst_sample_get_buffer (sample);
1516
1517   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1518
1519   g_mutex_lock (&priv->lock);
1520   if (priv->tr_cache_cookie != priv->transports_cookie) {
1521     clear_tr_cache (priv);
1522     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1523       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1524       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1525     }
1526     priv->tr_cache_cookie = priv->transports_cookie;
1527   }
1528   g_mutex_unlock (&priv->lock);
1529
1530   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1531     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1532
1533     if (is_rtp) {
1534       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1535     } else {
1536       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1537     }
1538   }
1539   gst_sample_unref (sample);
1540
1541   return GST_FLOW_OK;
1542 }
1543
1544 static GstAppSinkCallbacks sink_cb = {
1545   NULL,                         /* not interested in EOS */
1546   NULL,                         /* not interested in preroll samples */
1547   handle_new_sample,
1548 };
1549
1550 static GstElement *
1551 get_rtp_encoder (GstRTSPStream * stream, guint session)
1552 {
1553   GstRTSPStreamPrivate *priv = stream->priv;
1554
1555   if (priv->srtpenc == NULL) {
1556     gchar *name;
1557
1558     name = g_strdup_printf ("srtpenc_%u", session);
1559     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1560     g_free (name);
1561
1562     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1563   }
1564   return gst_object_ref (priv->srtpenc);
1565 }
1566
1567 static GstElement *
1568 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1569 {
1570   GstRTSPStreamPrivate *priv = stream->priv;
1571   GstElement *oldenc, *enc;
1572   GstPad *pad;
1573   gchar *name;
1574
1575   if (priv->idx != session)
1576     return NULL;
1577
1578   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1579
1580   oldenc = priv->srtpenc;
1581   enc = get_rtp_encoder (stream, session);
1582   name = g_strdup_printf ("rtp_sink_%d", session);
1583   pad = gst_element_get_request_pad (enc, name);
1584   g_free (name);
1585   gst_object_unref (pad);
1586
1587   if (oldenc == NULL)
1588     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1589         enc);
1590
1591   return enc;
1592 }
1593
1594 static GstElement *
1595 request_rtcp_encoder (GstElement * rtpbin, guint session,
1596     GstRTSPStream * stream)
1597 {
1598   GstRTSPStreamPrivate *priv = stream->priv;
1599   GstElement *oldenc, *enc;
1600   GstPad *pad;
1601   gchar *name;
1602
1603   if (priv->idx != session)
1604     return NULL;
1605
1606   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1607
1608   oldenc = priv->srtpenc;
1609   enc = get_rtp_encoder (stream, session);
1610   name = g_strdup_printf ("rtcp_sink_%d", session);
1611   pad = gst_element_get_request_pad (enc, name);
1612   g_free (name);
1613   gst_object_unref (pad);
1614
1615   if (oldenc == NULL)
1616     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1617         enc);
1618
1619   return enc;
1620 }
1621
1622 static GstCaps *
1623 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1624 {
1625   GstRTSPStreamPrivate *priv = stream->priv;
1626   GstCaps *caps;
1627
1628   GST_DEBUG ("request key %08x", ssrc);
1629
1630   g_mutex_lock (&priv->lock);
1631   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1632     gst_caps_ref (caps);
1633   g_mutex_unlock (&priv->lock);
1634
1635   return caps;
1636 }
1637
1638 static GstElement *
1639 request_rtcp_decoder (GstElement * rtpbin, guint session,
1640     GstRTSPStream * stream)
1641 {
1642   GstRTSPStreamPrivate *priv = stream->priv;
1643
1644   if (priv->idx != session)
1645     return NULL;
1646
1647   if (priv->srtpdec == NULL) {
1648     gchar *name;
1649
1650     name = g_strdup_printf ("srtpdec_%u", session);
1651     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1652     g_free (name);
1653
1654     g_signal_connect (priv->srtpdec, "request-key",
1655         (GCallback) request_key, stream);
1656   }
1657   return gst_object_ref (priv->srtpdec);
1658 }
1659
1660 /**
1661  * gst_rtsp_stream_join_bin:
1662  * @stream: a #GstRTSPStream
1663  * @bin: (transfer none): a #GstBin to join
1664  * @rtpbin: (transfer none): a rtpbin element in @bin
1665  * @state: the target state of the new elements
1666  *
1667  * Join the #GstBin @bin that contains the element @rtpbin.
1668  *
1669  * @stream will link to @rtpbin, which must be inside @bin. The elements
1670  * added to @bin will be set to the state given in @state.
1671  *
1672  * Returns: %TRUE on success.
1673  */
1674 gboolean
1675 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1676     GstElement * rtpbin, GstState state)
1677 {
1678   GstRTSPStreamPrivate *priv;
1679   gint i;
1680   guint idx;
1681   gchar *name;
1682   GstPad *pad, *sinkpad, *selpad;
1683   GstPadLinkReturn ret;
1684
1685   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1686   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1687   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1688
1689   priv = stream->priv;
1690
1691   g_mutex_lock (&priv->lock);
1692   if (priv->is_joined)
1693     goto was_joined;
1694
1695   /* create a session with the same index as the stream */
1696   idx = priv->idx;
1697
1698   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1699
1700   if (!alloc_ports (stream))
1701     goto no_ports;
1702
1703   /* update the dscp qos field in the sinks */
1704   update_dscp_qos (stream);
1705
1706   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1707       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1708     /* For SRTP */
1709     g_signal_connect (rtpbin, "request-rtp-encoder",
1710         (GCallback) request_rtp_encoder, stream);
1711     g_signal_connect (rtpbin, "request-rtcp-encoder",
1712         (GCallback) request_rtcp_encoder, stream);
1713     g_signal_connect (rtpbin, "request-rtcp-decoder",
1714         (GCallback) request_rtcp_decoder, stream);
1715   }
1716
1717   /* get a pad for sending RTP */
1718   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1719   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1720   g_free (name);
1721   /* link the RTP pad to the session manager, it should not really fail unless
1722    * this is not really an RTP pad */
1723   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1724   if (ret != GST_PAD_LINK_OK)
1725     goto link_failed;
1726
1727   /* get pads from the RTP session element for sending and receiving
1728    * RTP/RTCP*/
1729   name = g_strdup_printf ("send_rtp_src_%u", idx);
1730   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1731   g_free (name);
1732   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1733   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1734   g_free (name);
1735   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1736   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1737   g_free (name);
1738   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1739   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1740   g_free (name);
1741
1742   /* get the session */
1743   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1744
1745   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1746       stream);
1747   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1748       stream);
1749   g_signal_connect (priv->session, "on-ssrc-active",
1750       (GCallback) on_ssrc_active, stream);
1751   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1752       stream);
1753   g_signal_connect (priv->session, "on-bye-timeout",
1754       (GCallback) on_bye_timeout, stream);
1755   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1756       stream);
1757
1758   for (i = 0; i < 2; i++) {
1759     GstPad *teepad, *queuepad;
1760     /* For the sender we create this bit of pipeline for both
1761      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1762      * we need to add a queue before appsink to make the pipeline
1763      * not block. For the TCP case, we want to pump data to the
1764      * client as fast as possible anyway.
1765      *
1766      * .--------.      .-----.    .---------.
1767      * | rtpbin |      | tee |    | udpsink |
1768      * |       send->sink   src->sink       |
1769      * '--------'      |     |    '---------'
1770      *                 |     |    .---------.    .---------.
1771      *                 |     |    |  queue  |    | appsink |
1772      *                 |    src->sink      src->sink       |
1773      *                 '-----'    '---------'    '---------'
1774      *
1775      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1776      * udpsink directly to the session.
1777      */
1778     /* add udpsink */
1779     gst_bin_add (bin, priv->udpsink[i]);
1780     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1781
1782     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1783       /* make tee for RTP/RTCP */
1784       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1785       gst_bin_add (bin, priv->tee[i]);
1786
1787       /* and link to rtpbin send pad */
1788       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1789       gst_pad_link (priv->send_src[i], pad);
1790       gst_object_unref (pad);
1791
1792       /* link tee to udpsink */
1793       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1794       gst_pad_link (teepad, sinkpad);
1795       gst_object_unref (teepad);
1796
1797       /* make queue */
1798       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1799       gst_bin_add (bin, priv->appqueue[i]);
1800       /* and link to tee */
1801       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1802       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1803       gst_pad_link (teepad, pad);
1804       gst_object_unref (pad);
1805       gst_object_unref (teepad);
1806
1807       /* make appsink */
1808       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1809       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1810       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1811       gst_bin_add (bin, priv->appsink[i]);
1812       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1813           &sink_cb, stream, NULL);
1814       /* and link to queue */
1815       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1816       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1817       gst_pad_link (queuepad, pad);
1818       gst_object_unref (pad);
1819       gst_object_unref (queuepad);
1820     } else {
1821       /* else only udpsink needed, link it to the session */
1822       gst_pad_link (priv->send_src[i], sinkpad);
1823     }
1824     gst_object_unref (sinkpad);
1825
1826     /* For the receiver we create this bit of pipeline for both
1827      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1828      * and it is all funneled into the rtpbin receive pad.
1829      *
1830      * .--------.     .--------.    .--------.
1831      * | udpsrc |     | funnel |    | rtpbin |
1832      * |       src->sink      src->sink      |
1833      * '--------'     |        |    '--------'
1834      * .--------.     |        |
1835      * | appsrc |     |        |
1836      * |       src->sink       |
1837      * '--------'     '--------'
1838      */
1839     /* make funnel for the RTP/RTCP receivers */
1840     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1841     gst_bin_add (bin, priv->funnel[i]);
1842
1843     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1844     gst_pad_link (pad, priv->recv_sink[i]);
1845     gst_object_unref (pad);
1846
1847     if (priv->udpsrc_v4[i]) {
1848       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1849        * values */
1850       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1851       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1852       /* add udpsrc */
1853       gst_bin_add (bin, priv->udpsrc_v4[i]);
1854
1855       /* and link to the funnel v4 */
1856       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1857       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1858       gst_pad_link (pad, selpad);
1859       gst_object_unref (pad);
1860       gst_object_unref (selpad);
1861     }
1862
1863     if (priv->udpsrc_v6[i]) {
1864       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1865       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1866       gst_bin_add (bin, priv->udpsrc_v6[i]);
1867
1868       /* and link to the funnel v6 */
1869       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1870       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1871       gst_pad_link (pad, selpad);
1872       gst_object_unref (pad);
1873       gst_object_unref (selpad);
1874     }
1875
1876     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1877       /* make and add appsrc */
1878       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1879       gst_bin_add (bin, priv->appsrc[i]);
1880       /* and link to the funnel */
1881       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1882       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1883       gst_pad_link (pad, selpad);
1884       gst_object_unref (pad);
1885       gst_object_unref (selpad);
1886     }
1887
1888     /* check if we need to set to a special state */
1889     if (state != GST_STATE_NULL) {
1890       if (priv->udpsink[i])
1891         gst_element_set_state (priv->udpsink[i], state);
1892       if (priv->appsink[i])
1893         gst_element_set_state (priv->appsink[i], state);
1894       if (priv->appqueue[i])
1895         gst_element_set_state (priv->appqueue[i], state);
1896       if (priv->tee[i])
1897         gst_element_set_state (priv->tee[i], state);
1898       if (priv->funnel[i])
1899         gst_element_set_state (priv->funnel[i], state);
1900       if (priv->appsrc[i])
1901         gst_element_set_state (priv->appsrc[i], state);
1902     }
1903   }
1904
1905   /* be notified of caps changes */
1906   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
1907       (GCallback) caps_notify, stream);
1908
1909   priv->is_joined = TRUE;
1910   g_mutex_unlock (&priv->lock);
1911
1912   return TRUE;
1913
1914   /* ERRORS */
1915 was_joined:
1916   {
1917     g_mutex_unlock (&priv->lock);
1918     return TRUE;
1919   }
1920 no_ports:
1921   {
1922     g_mutex_unlock (&priv->lock);
1923     GST_WARNING ("failed to allocate ports %u", idx);
1924     return FALSE;
1925   }
1926 link_failed:
1927   {
1928     GST_WARNING ("failed to link stream %u", idx);
1929     gst_object_unref (priv->send_rtp_sink);
1930     priv->send_rtp_sink = NULL;
1931     g_mutex_unlock (&priv->lock);
1932     return FALSE;
1933   }
1934 }
1935
1936 /**
1937  * gst_rtsp_stream_leave_bin:
1938  * @stream: a #GstRTSPStream
1939  * @bin: (transfer none): a #GstBin
1940  * @rtpbin: (transfer none): a rtpbin #GstElement
1941  *
1942  * Remove the elements of @stream from @bin.
1943  *
1944  * Return: %TRUE on success.
1945  */
1946 gboolean
1947 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1948     GstElement * rtpbin)
1949 {
1950   GstRTSPStreamPrivate *priv;
1951   gint i;
1952
1953   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1954   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1955   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1956
1957   priv = stream->priv;
1958
1959   g_mutex_lock (&priv->lock);
1960   if (!priv->is_joined)
1961     goto was_not_joined;
1962
1963   /* all transports must be removed by now */
1964   g_return_val_if_fail (priv->transports == NULL, FALSE);
1965
1966   clear_tr_cache (priv);
1967
1968   GST_INFO ("stream %p leaving bin", stream);
1969
1970   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1971   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
1972   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1973   gst_object_unref (priv->send_rtp_sink);
1974   priv->send_rtp_sink = NULL;
1975
1976   for (i = 0; i < 2; i++) {
1977     if (priv->udpsink[i])
1978       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1979     if (priv->appsink[i])
1980       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1981     if (priv->appqueue[i])
1982       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1983     if (priv->tee[i])
1984       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1985     if (priv->funnel[i])
1986       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1987     if (priv->appsrc[i])
1988       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1989     if (priv->udpsrc_v4[i]) {
1990       /* and set udpsrc to NULL now before removing */
1991       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1992       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1993       /* removing them should also nicely release the request
1994        * pads when they finalize */
1995       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1996     }
1997     if (priv->udpsrc_v6[i]) {
1998       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1999       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2000       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2001     }
2002     if (priv->udpsink[i])
2003       gst_bin_remove (bin, priv->udpsink[i]);
2004     if (priv->appsrc[i])
2005       gst_bin_remove (bin, priv->appsrc[i]);
2006     if (priv->appsink[i])
2007       gst_bin_remove (bin, priv->appsink[i]);
2008     if (priv->appqueue[i])
2009       gst_bin_remove (bin, priv->appqueue[i]);
2010     if (priv->tee[i])
2011       gst_bin_remove (bin, priv->tee[i]);
2012     if (priv->funnel[i])
2013       gst_bin_remove (bin, priv->funnel[i]);
2014
2015     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2016     gst_object_unref (priv->recv_sink[i]);
2017     priv->recv_sink[i] = NULL;
2018
2019     priv->udpsrc_v4[i] = NULL;
2020     priv->udpsrc_v6[i] = NULL;
2021     priv->udpsink[i] = NULL;
2022     priv->appsrc[i] = NULL;
2023     priv->appsink[i] = NULL;
2024     priv->appqueue[i] = NULL;
2025     priv->tee[i] = NULL;
2026     priv->funnel[i] = NULL;
2027   }
2028   gst_object_unref (priv->send_src[0]);
2029   priv->send_src[0] = NULL;
2030
2031   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2032   gst_object_unref (priv->send_src[1]);
2033   priv->send_src[1] = NULL;
2034
2035   g_object_unref (priv->session);
2036   priv->session = NULL;
2037   if (priv->caps)
2038     gst_caps_unref (priv->caps);
2039   priv->caps = NULL;
2040
2041   if (priv->srtpenc)
2042     gst_object_unref (priv->srtpenc);
2043
2044   priv->is_joined = FALSE;
2045   g_mutex_unlock (&priv->lock);
2046
2047   return TRUE;
2048
2049 was_not_joined:
2050   {
2051     g_mutex_unlock (&priv->lock);
2052     return TRUE;
2053   }
2054 }
2055
2056 /**
2057  * gst_rtsp_stream_get_rtpinfo:
2058  * @stream: a #GstRTSPStream
2059  * @rtptime: (allow-none): result RTP timestamp
2060  * @seq: (allow-none): result RTP seqnum
2061  * @clock_rate: (allow-none): the clock rate
2062  * @running_time: (allow-none): result running-time
2063  *
2064  * Retrieve the current rtptime, seq and running-time. This is used to
2065  * construct a RTPInfo reply header.
2066  *
2067  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2068  */
2069 gboolean
2070 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2071     guint * rtptime, guint * seq, guint * clock_rate,
2072     GstClockTime * running_time)
2073 {
2074   GstRTSPStreamPrivate *priv;
2075   GstStructure *stats;
2076   GObjectClass *payobjclass;
2077
2078   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2079
2080   priv = stream->priv;
2081
2082   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2083
2084   g_mutex_lock (&priv->lock);
2085
2086   if (g_object_class_find_property (payobjclass, "stats")) {
2087     g_object_get (priv->payloader, "stats", &stats, NULL);
2088     if (stats == NULL)
2089       goto no_stats;
2090
2091     if (seq)
2092       gst_structure_get_uint (stats, "seqnum", seq);
2093
2094     if (rtptime)
2095       gst_structure_get_uint (stats, "timestamp", rtptime);
2096
2097     if (running_time)
2098       gst_structure_get_clock_time (stats, "running-time", running_time);
2099
2100     if (clock_rate) {
2101       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2102       if (*clock_rate == 0 && running_time)
2103         *running_time = GST_CLOCK_TIME_NONE;
2104     }
2105     gst_structure_free (stats);
2106   } else {
2107     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2108         !g_object_class_find_property (payobjclass, "timestamp"))
2109       goto no_stats;
2110
2111     if (seq)
2112       g_object_get (priv->payloader, "seqnum", seq, NULL);
2113
2114     if (rtptime)
2115       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2116
2117     if (running_time)
2118       *running_time = GST_CLOCK_TIME_NONE;
2119   }
2120   g_mutex_unlock (&priv->lock);
2121
2122   return TRUE;
2123
2124   /* ERRORS */
2125 no_stats:
2126   {
2127     GST_WARNING ("Could not get payloader stats");
2128     g_mutex_unlock (&priv->lock);
2129     return FALSE;
2130   }
2131 }
2132
2133 /**
2134  * gst_rtsp_stream_get_caps:
2135  * @stream: a #GstRTSPStream
2136  *
2137  * Retrieve the current caps of @stream.
2138  *
2139  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2140  * after usage.
2141  */
2142 GstCaps *
2143 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2144 {
2145   GstRTSPStreamPrivate *priv;
2146   GstCaps *result;
2147
2148   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2149
2150   priv = stream->priv;
2151
2152   g_mutex_lock (&priv->lock);
2153   if ((result = priv->caps))
2154     gst_caps_ref (result);
2155   g_mutex_unlock (&priv->lock);
2156
2157   return result;
2158 }
2159
2160 /**
2161  * gst_rtsp_stream_recv_rtp:
2162  * @stream: a #GstRTSPStream
2163  * @buffer: (transfer full): a #GstBuffer
2164  *
2165  * Handle an RTP buffer for the stream. This method is usually called when a
2166  * message has been received from a client using the TCP transport.
2167  *
2168  * This function takes ownership of @buffer.
2169  *
2170  * Returns: a GstFlowReturn.
2171  */
2172 GstFlowReturn
2173 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2174 {
2175   GstRTSPStreamPrivate *priv;
2176   GstFlowReturn ret;
2177   GstElement *element;
2178
2179   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2180   priv = stream->priv;
2181   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2182   g_return_val_if_fail (priv->is_joined, FALSE);
2183
2184   g_mutex_lock (&priv->lock);
2185   if (priv->appsrc[0])
2186     element = gst_object_ref (priv->appsrc[0]);
2187   else
2188     element = NULL;
2189   g_mutex_unlock (&priv->lock);
2190
2191   if (element) {
2192     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2193     gst_object_unref (element);
2194   } else {
2195     ret = GST_FLOW_OK;
2196   }
2197   return ret;
2198 }
2199
2200 /**
2201  * gst_rtsp_stream_recv_rtcp:
2202  * @stream: a #GstRTSPStream
2203  * @buffer: (transfer full): a #GstBuffer
2204  *
2205  * Handle an RTCP buffer for the stream. This method is usually called when a
2206  * message has been received from a client using the TCP transport.
2207  *
2208  * This function takes ownership of @buffer.
2209  *
2210  * Returns: a GstFlowReturn.
2211  */
2212 GstFlowReturn
2213 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2214 {
2215   GstRTSPStreamPrivate *priv;
2216   GstFlowReturn ret;
2217   GstElement *element;
2218
2219   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2220   priv = stream->priv;
2221   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2222   g_return_val_if_fail (priv->is_joined, FALSE);
2223
2224   g_mutex_lock (&priv->lock);
2225   if (priv->appsrc[1])
2226     element = gst_object_ref (priv->appsrc[1]);
2227   else
2228     element = NULL;
2229   g_mutex_unlock (&priv->lock);
2230
2231   if (element) {
2232     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2233     gst_object_unref (element);
2234   } else {
2235     ret = GST_FLOW_OK;
2236     gst_buffer_unref (buffer);
2237   }
2238   return ret;
2239 }
2240
2241 /* must be called with lock */
2242 static gboolean
2243 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2244     gboolean add)
2245 {
2246   GstRTSPStreamPrivate *priv = stream->priv;
2247   const GstRTSPTransport *tr;
2248
2249   tr = gst_rtsp_stream_transport_get_transport (trans);
2250
2251   switch (tr->lower_transport) {
2252     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2253     {
2254       GstRTSPMulticastTransportSource *source;
2255       GstBin *bin;
2256
2257       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2258
2259       if (add) {
2260         gchar *host;
2261         gint i;
2262         GstPad *selpad, *pad;
2263
2264         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2265         source->transport = trans;
2266
2267         for (i = 0; i < 2; i++) {
2268           host =
2269               g_strdup_printf ("udp://%s:%d", tr->destination,
2270               (i == 0) ? tr->port.min : tr->port.max);
2271           source->udpsrc[i] =
2272               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2273           g_free (host);
2274
2275           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2276            * values */
2277           gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2278           gst_element_set_locked_state (source->udpsrc[i], TRUE);
2279           /* add udpsrc */
2280           gst_bin_add (bin, source->udpsrc[i]);
2281
2282           /* and link to the funnel v4 */
2283           source->selpad[i] = selpad =
2284               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2285           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2286           gst_pad_link (pad, selpad);
2287           gst_object_unref (pad);
2288           gst_object_unref (selpad);
2289         }
2290         gst_object_unref (bin);
2291
2292         priv->transport_sources =
2293             g_list_prepend (priv->transport_sources, source);
2294       } else {
2295         GList *l;
2296
2297         for (l = priv->transport_sources; l; l = l->next) {
2298           source = l->data;
2299
2300           if (source->transport == trans) {
2301             priv->transport_sources =
2302                 g_list_delete_link (priv->transport_sources, l);
2303             break;
2304           }
2305         }
2306
2307         if (l != NULL) {
2308           gint i;
2309
2310           for (i = 0; i < 2; i++) {
2311             /* Will automatically unlink everything */
2312             gst_bin_remove (bin,
2313                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2314
2315             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2316             gst_object_unref (source->udpsrc[i]);
2317
2318             gst_element_release_request_pad (priv->funnel[i],
2319                 source->selpad[i]);
2320           }
2321
2322           g_slice_free (GstRTSPMulticastTransportSource, source);
2323         }
2324       }
2325
2326       /* fall through for the generic case */
2327     }
2328     case GST_RTSP_LOWER_TRANS_UDP:
2329     {
2330       gchar *dest;
2331       gint min, max;
2332       guint ttl = 0;
2333
2334       dest = tr->destination;
2335       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2336         min = tr->port.min;
2337         max = tr->port.max;
2338         ttl = tr->ttl;
2339       } else {
2340         min = tr->client_port.min;
2341         max = tr->client_port.max;
2342       }
2343
2344       if (add) {
2345         if (ttl > 0) {
2346           GST_INFO ("setting ttl-mc %d", ttl);
2347           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2348           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2349         }
2350         GST_INFO ("adding %s:%d-%d", dest, min, max);
2351         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2352         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2353         priv->transports = g_list_prepend (priv->transports, trans);
2354       } else {
2355         GST_INFO ("removing %s:%d-%d", dest, min, max);
2356         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2357         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2358         priv->transports = g_list_remove (priv->transports, trans);
2359       }
2360       priv->transports_cookie++;
2361       break;
2362     }
2363     case GST_RTSP_LOWER_TRANS_TCP:
2364       if (add) {
2365         GST_INFO ("adding TCP %s", tr->destination);
2366         priv->transports = g_list_prepend (priv->transports, trans);
2367       } else {
2368         GST_INFO ("removing TCP %s", tr->destination);
2369         priv->transports = g_list_remove (priv->transports, trans);
2370       }
2371       priv->transports_cookie++;
2372       break;
2373     default:
2374       goto unknown_transport;
2375   }
2376   return TRUE;
2377
2378   /* ERRORS */
2379 unknown_transport:
2380   {
2381     GST_INFO ("Unknown transport %d", tr->lower_transport);
2382     return FALSE;
2383   }
2384 }
2385
2386
2387 /**
2388  * gst_rtsp_stream_add_transport:
2389  * @stream: a #GstRTSPStream
2390  * @trans: (transfer none): a #GstRTSPStreamTransport
2391  *
2392  * Add the transport in @trans to @stream. The media of @stream will
2393  * then also be send to the values configured in @trans.
2394  *
2395  * @stream must be joined to a bin.
2396  *
2397  * @trans must contain a valid #GstRTSPTransport.
2398  *
2399  * Returns: %TRUE if @trans was added
2400  */
2401 gboolean
2402 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2403     GstRTSPStreamTransport * trans)
2404 {
2405   GstRTSPStreamPrivate *priv;
2406   gboolean res;
2407
2408   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2409   priv = stream->priv;
2410   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2411   g_return_val_if_fail (priv->is_joined, FALSE);
2412
2413   g_mutex_lock (&priv->lock);
2414   res = update_transport (stream, trans, TRUE);
2415   g_mutex_unlock (&priv->lock);
2416
2417   return res;
2418 }
2419
2420 /**
2421  * gst_rtsp_stream_remove_transport:
2422  * @stream: a #GstRTSPStream
2423  * @trans: (transfer none): a #GstRTSPStreamTransport
2424  *
2425  * Remove the transport in @trans from @stream. The media of @stream will
2426  * not be sent to the values configured in @trans.
2427  *
2428  * @stream must be joined to a bin.
2429  *
2430  * @trans must contain a valid #GstRTSPTransport.
2431  *
2432  * Returns: %TRUE if @trans was removed
2433  */
2434 gboolean
2435 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2436     GstRTSPStreamTransport * trans)
2437 {
2438   GstRTSPStreamPrivate *priv;
2439   gboolean res;
2440
2441   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2442   priv = stream->priv;
2443   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2444   g_return_val_if_fail (priv->is_joined, FALSE);
2445
2446   g_mutex_lock (&priv->lock);
2447   res = update_transport (stream, trans, FALSE);
2448   g_mutex_unlock (&priv->lock);
2449
2450   return res;
2451 }
2452
2453 /**
2454  * gst_rtsp_stream_update_crypto:
2455  * @stream: a #GstRTSPStream
2456  * @ssrc: the SSRC
2457  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2458  *
2459  * Update the new crypto information for @ssrc in @stream. If information
2460  * for @ssrc did not exist, it will be added. If information
2461  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2462  * be removed from @stream.
2463  *
2464  * Returns: %TRUE if @crypto could be updated
2465  */
2466 gboolean
2467 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2468     guint ssrc, GstCaps * crypto)
2469 {
2470   GstRTSPStreamPrivate *priv;
2471
2472   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2473   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2474
2475   priv = stream->priv;
2476
2477   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2478
2479   g_mutex_lock (&priv->lock);
2480   if (crypto)
2481     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2482         gst_caps_ref (crypto));
2483   else
2484     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2485   g_mutex_unlock (&priv->lock);
2486
2487   return TRUE;
2488 }
2489
2490 /**
2491  * gst_rtsp_stream_get_rtp_socket:
2492  * @stream: a #GstRTSPStream
2493  * @family: the socket family
2494  *
2495  * Get the RTP socket from @stream for a @family.
2496  *
2497  * @stream must be joined to a bin.
2498  *
2499  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2500  * socket could be allocated for @family. Unref after usage
2501  */
2502 GSocket *
2503 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2504 {
2505   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2506   GSocket *socket;
2507   const gchar *name;
2508
2509   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2510   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2511       family == G_SOCKET_FAMILY_IPV6, NULL);
2512   g_return_val_if_fail (priv->udpsink[0], NULL);
2513
2514   if (family == G_SOCKET_FAMILY_IPV6)
2515     name = "socket-v6";
2516   else
2517     name = "socket";
2518
2519   g_object_get (priv->udpsink[0], name, &socket, NULL);
2520
2521   return socket;
2522 }
2523
2524 /**
2525  * gst_rtsp_stream_get_rtcp_socket:
2526  * @stream: a #GstRTSPStream
2527  * @family: the socket family
2528  *
2529  * Get the RTCP socket from @stream for a @family.
2530  *
2531  * @stream must be joined to a bin.
2532  *
2533  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2534  * socket could be allocated for @family. Unref after usage
2535  */
2536 GSocket *
2537 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2538 {
2539   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2540   GSocket *socket;
2541   const gchar *name;
2542
2543   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2544   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2545       family == G_SOCKET_FAMILY_IPV6, NULL);
2546   g_return_val_if_fail (priv->udpsink[1], NULL);
2547
2548   if (family == G_SOCKET_FAMILY_IPV6)
2549     name = "socket-v6";
2550   else
2551     name = "socket";
2552
2553   g_object_get (priv->udpsink[1], name, &socket, NULL);
2554
2555   return socket;
2556 }
2557
2558 /**
2559  * gst_rtsp_stream_transport_filter:
2560  * @stream: a #GstRTSPStream
2561  * @func: (scope call) (allow-none): a callback
2562  * @user_data: (closure): user data passed to @func
2563  *
2564  * Call @func for each transport managed by @stream. The result value of @func
2565  * determines what happens to the transport. @func will be called with @stream
2566  * locked so no further actions on @stream can be performed from @func.
2567  *
2568  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2569  * @stream.
2570  *
2571  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2572  *
2573  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2574  * will also be added with an additional ref to the result #GList of this
2575  * function..
2576  *
2577  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2578  *
2579  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2580  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2581  * element in the #GList should be unreffed before the list is freed.
2582  */
2583 GList *
2584 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2585     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2586 {
2587   GstRTSPStreamPrivate *priv;
2588   GList *result, *walk, *next;
2589   GHashTable *visited;
2590   guint cookie;
2591
2592   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2593
2594   priv = stream->priv;
2595
2596   result = NULL;
2597   if (func)
2598     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
2599
2600   g_mutex_lock (&priv->lock);
2601 restart:
2602   cookie = priv->transports_cookie;
2603   for (walk = priv->transports; walk; walk = next) {
2604     GstRTSPStreamTransport *trans = walk->data;
2605     GstRTSPFilterResult res;
2606     gboolean changed;
2607
2608     next = g_list_next (walk);
2609
2610     if (func) {
2611       /* only visit each transport once */
2612       if (g_hash_table_contains (visited, trans))
2613         continue;
2614
2615       g_hash_table_add (visited, g_object_ref (trans));
2616       g_mutex_unlock (&priv->lock);
2617
2618       res = func (stream, trans, user_data);
2619
2620       g_mutex_lock (&priv->lock);
2621     } else
2622       res = GST_RTSP_FILTER_REF;
2623
2624     changed = (cookie != priv->transports_cookie);
2625
2626     switch (res) {
2627       case GST_RTSP_FILTER_REMOVE:
2628         update_transport (stream, trans, FALSE);
2629         break;
2630       case GST_RTSP_FILTER_REF:
2631         result = g_list_prepend (result, g_object_ref (trans));
2632         break;
2633       case GST_RTSP_FILTER_KEEP:
2634       default:
2635         break;
2636     }
2637     if (changed)
2638       goto restart;
2639   }
2640   g_mutex_unlock (&priv->lock);
2641
2642   if (func)
2643     g_hash_table_unref (visited);
2644
2645   return result;
2646 }
2647
2648 static GstPadProbeReturn
2649 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2650 {
2651   GstRTSPStreamPrivate *priv;
2652   GstRTSPStream *stream;
2653
2654   stream = user_data;
2655   priv = stream->priv;
2656
2657   GST_DEBUG_OBJECT (pad, "now blocking");
2658
2659   g_mutex_lock (&priv->lock);
2660   priv->blocking = TRUE;
2661   g_mutex_unlock (&priv->lock);
2662
2663   gst_element_post_message (priv->payloader,
2664       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2665           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2666
2667   return GST_PAD_PROBE_OK;
2668 }
2669
2670 /**
2671  * gst_rtsp_stream_set_blocked:
2672  * @stream: a #GstRTSPStream
2673  * @blocked: boolean indicating we should block or unblock
2674  *
2675  * Blocks or unblocks the dataflow on @stream.
2676  *
2677  * Returns: %TRUE on success
2678  */
2679 gboolean
2680 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2681 {
2682   GstRTSPStreamPrivate *priv;
2683
2684   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2685
2686   priv = stream->priv;
2687
2688   g_mutex_lock (&priv->lock);
2689   if (blocked) {
2690     priv->blocking = FALSE;
2691     if (priv->blocked_id == 0) {
2692       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2693           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2694           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2695           g_object_ref (stream), g_object_unref);
2696     }
2697   } else {
2698     if (priv->blocked_id != 0) {
2699       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2700       priv->blocked_id = 0;
2701       priv->blocking = FALSE;
2702     }
2703   }
2704   g_mutex_unlock (&priv->lock);
2705
2706   return TRUE;
2707 }
2708
2709 /**
2710  * gst_rtsp_stream_is_blocking:
2711  * @stream: a #GstRTSPStream
2712  *
2713  * Check if @stream is blocking on a #GstBuffer.
2714  *
2715  * Returns: %TRUE if @stream is blocking
2716  */
2717 gboolean
2718 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2719 {
2720   GstRTSPStreamPrivate *priv;
2721   gboolean result;
2722
2723   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2724
2725   priv = stream->priv;
2726
2727   g_mutex_lock (&priv->lock);
2728   result = priv->blocking;
2729   g_mutex_unlock (&priv->lock);
2730
2731   return result;
2732 }