sdp: add rollover counters for all sender SSRC
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 /* Container for udpsrc elements created for a specific RTSPTransport. */
66 typedef struct
67 {
68   GstElement *udpsrc[2];
69 } GstRTSPStreamUDPSrcs;
70
71 static void
72 destroy_udp_srcs_func (gpointer data)
73 {
74   g_slice_free (GstRTSPStreamUDPSrcs, (GstRTSPStreamUDPSrcs *) data);
75 }
76
77 struct _GstRTSPStreamPrivate
78 {
79   GMutex lock;
80   guint idx;
81   /* Only one pad is ever set */
82   GstPad *srcpad, *sinkpad;
83   GstElement *payloader;
84   guint buffer_size;
85   gboolean is_joined;
86   GstBin *joined_bin;
87
88   /* TRUE if this stream is running on
89    * the client side of an RTSP link (for RECORD) */
90   gboolean client_side;
91   gchar *control;
92
93   GstRTSPProfile profiles;
94   GstRTSPLowerTrans protocols;
95
96   /* pads on the rtpbin */
97   GstPad *send_rtp_sink;
98   GstPad *recv_rtp_src;
99   GstPad *recv_sink[2];
100   GstPad *send_src[2];
101
102   /* the RTPSession object */
103   GObject *session;
104
105   /* SRTP encoder/decoder */
106   GstElement *srtpenc;
107   GstElement *srtpdec;
108   GHashTable *keys;
109
110   /* Unicast UDP sources associated with RTSPTransports */
111   GHashTable *udpsrcs;
112
113   /* Only allow one set of IPV4 and IPV6 multicast udpsrcs */
114   GstElement *udpsrc_mcast_v4[2];
115   GstElement *udpsrc_mcast_v6[2];
116
117   GstElement *udpqueue[2];
118   GstElement *udpsink[2];
119
120   /* for TCP transport */
121   GstElement *appsrc[2];
122   GstClockTime appsrc_base_time[2];
123   GstElement *appqueue[2];
124   GstElement *appsink[2];
125
126   GstElement *tee[2];
127   GstElement *funnel[2];
128
129   /* retransmission */
130   GstElement *rtxsend;
131   guint rtx_pt;
132   GstClockTime rtx_time;
133
134   /* server ports for sending/receiving over ipv4 */
135   GstRTSPRange server_port_v4;
136   GstRTSPAddress *server_addr_v4;
137
138   /* server ports for sending/receiving over ipv6 */
139   GstRTSPRange server_port_v6;
140   GstRTSPAddress *server_addr_v6;
141
142   /* multicast addresses */
143   GstRTSPAddressPool *pool;
144   GstRTSPAddress *addr_v4;
145   GstRTSPAddress *addr_v6;
146   gboolean have_ipv4_mcast;
147   gboolean have_ipv6_mcast;
148
149   gchar *multicast_iface;
150
151   /* the caps of the stream */
152   gulong caps_sig;
153   GstCaps *caps;
154
155   /* transports we stream to */
156   guint n_active;
157   GList *transports;
158   guint transports_cookie;
159   GList *tr_cache_rtp;
160   GList *tr_cache_rtcp;
161   guint tr_cache_cookie_rtp;
162   guint tr_cache_cookie_rtcp;
163
164
165   gint dscp_qos;
166
167   /* stream blocking */
168   gulong blocked_id;
169   gboolean blocking;
170
171   /* pt->caps map for RECORD streams */
172   GHashTable *ptmap;
173
174   GstRTSPPublishClockMode publish_clock_mode;
175 };
176
177 #define DEFAULT_CONTROL         NULL
178 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
179 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
180                                         GST_RTSP_LOWER_TRANS_TCP
181
182 enum
183 {
184   PROP_0,
185   PROP_CONTROL,
186   PROP_PROFILES,
187   PROP_PROTOCOLS,
188   PROP_LAST
189 };
190
191 enum
192 {
193   SIGNAL_NEW_RTP_ENCODER,
194   SIGNAL_NEW_RTCP_ENCODER,
195   SIGNAL_LAST
196 };
197
198 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
199 #define GST_CAT_DEFAULT rtsp_stream_debug
200
201 static GQuark ssrc_stream_map_key;
202
203 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
204     GValue * value, GParamSpec * pspec);
205 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
206     const GValue * value, GParamSpec * pspec);
207
208 static void gst_rtsp_stream_finalize (GObject * obj);
209
210 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
211
212 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
213
214 static void
215 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
216 {
217   GObjectClass *gobject_class;
218
219   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
220
221   gobject_class = G_OBJECT_CLASS (klass);
222
223   gobject_class->get_property = gst_rtsp_stream_get_property;
224   gobject_class->set_property = gst_rtsp_stream_set_property;
225   gobject_class->finalize = gst_rtsp_stream_finalize;
226
227   g_object_class_install_property (gobject_class, PROP_CONTROL,
228       g_param_spec_string ("control", "Control",
229           "The control string for this stream", DEFAULT_CONTROL,
230           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   g_object_class_install_property (gobject_class, PROP_PROFILES,
233       g_param_spec_flags ("profiles", "Profiles",
234           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
235           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
236
237   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
238       g_param_spec_flags ("protocols", "Protocols",
239           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
240           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
241
242   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
243       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
245       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
246
247   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
248       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
249       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
250       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
251
252   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
253
254   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
255 }
256
257 static void
258 gst_rtsp_stream_init (GstRTSPStream * stream)
259 {
260   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
261
262   GST_DEBUG ("new stream %p", stream);
263
264   stream->priv = priv;
265
266   priv->dscp_qos = -1;
267   priv->control = g_strdup (DEFAULT_CONTROL);
268   priv->profiles = DEFAULT_PROFILES;
269   priv->protocols = DEFAULT_PROTOCOLS;
270   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
271
272   g_mutex_init (&priv->lock);
273
274   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
275       NULL, (GDestroyNotify) gst_caps_unref);
276   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
277       (GDestroyNotify) gst_caps_unref);
278   priv->udpsrcs = g_hash_table_new_full (g_direct_hash, g_direct_equal,
279       NULL, (GDestroyNotify) destroy_udp_srcs_func);
280 }
281
282 static void
283 gst_rtsp_stream_finalize (GObject * obj)
284 {
285   GstRTSPStream *stream;
286   GstRTSPStreamPrivate *priv;
287
288   stream = GST_RTSP_STREAM (obj);
289   priv = stream->priv;
290
291   GST_DEBUG ("finalize stream %p", stream);
292
293   /* we really need to be unjoined now */
294   g_return_if_fail (!priv->is_joined);
295
296   if (priv->addr_v4)
297     gst_rtsp_address_free (priv->addr_v4);
298   if (priv->addr_v6)
299     gst_rtsp_address_free (priv->addr_v6);
300   if (priv->server_addr_v4)
301     gst_rtsp_address_free (priv->server_addr_v4);
302   if (priv->server_addr_v6)
303     gst_rtsp_address_free (priv->server_addr_v6);
304   if (priv->pool)
305     g_object_unref (priv->pool);
306   if (priv->rtxsend)
307     g_object_unref (priv->rtxsend);
308
309   g_free (priv->multicast_iface);
310
311   gst_object_unref (priv->payloader);
312   if (priv->srcpad)
313     gst_object_unref (priv->srcpad);
314   if (priv->sinkpad)
315     gst_object_unref (priv->sinkpad);
316   g_free (priv->control);
317   g_mutex_clear (&priv->lock);
318
319   g_hash_table_unref (priv->keys);
320   g_hash_table_destroy (priv->ptmap);
321
322   /* We expect all udpsrcs to be cleaned up by this point. */
323   if (g_hash_table_size (priv->udpsrcs) > 0)
324     g_critical ("Unreffing udpsrcs hash table that contains elements.");
325   g_hash_table_unref (priv->udpsrcs);
326
327   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
328 }
329
330 static void
331 gst_rtsp_stream_get_property (GObject * object, guint propid,
332     GValue * value, GParamSpec * pspec)
333 {
334   GstRTSPStream *stream = GST_RTSP_STREAM (object);
335
336   switch (propid) {
337     case PROP_CONTROL:
338       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
339       break;
340     case PROP_PROFILES:
341       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
342       break;
343     case PROP_PROTOCOLS:
344       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
345       break;
346     default:
347       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
348   }
349 }
350
351 static void
352 gst_rtsp_stream_set_property (GObject * object, guint propid,
353     const GValue * value, GParamSpec * pspec)
354 {
355   GstRTSPStream *stream = GST_RTSP_STREAM (object);
356
357   switch (propid) {
358     case PROP_CONTROL:
359       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
360       break;
361     case PROP_PROFILES:
362       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
363       break;
364     case PROP_PROTOCOLS:
365       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
366       break;
367     default:
368       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
369   }
370 }
371
372 /**
373  * gst_rtsp_stream_new:
374  * @idx: an index
375  * @pad: a #GstPad
376  * @payloader: a #GstElement
377  *
378  * Create a new media stream with index @idx that handles RTP data on
379  * @pad and has a payloader element @payloader if @pad is a source pad
380  * or a depayloader element @payloader if @pad is a sink pad.
381  *
382  * Returns: (transfer full): a new #GstRTSPStream
383  */
384 GstRTSPStream *
385 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
386 {
387   GstRTSPStreamPrivate *priv;
388   GstRTSPStream *stream;
389
390   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
391   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
392
393   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
394   priv = stream->priv;
395   priv->idx = idx;
396   priv->payloader = gst_object_ref (payloader);
397   if (GST_PAD_IS_SRC (pad))
398     priv->srcpad = gst_object_ref (pad);
399   else
400     priv->sinkpad = gst_object_ref (pad);
401
402   return stream;
403 }
404
405 /**
406  * gst_rtsp_stream_get_index:
407  * @stream: a #GstRTSPStream
408  *
409  * Get the stream index.
410  *
411  * Return: the stream index.
412  */
413 guint
414 gst_rtsp_stream_get_index (GstRTSPStream * stream)
415 {
416   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
417
418   return stream->priv->idx;
419 }
420
421 /**
422  * gst_rtsp_stream_get_pt:
423  * @stream: a #GstRTSPStream
424  *
425  * Get the stream payload type.
426  *
427  * Return: the stream payload type.
428  */
429 guint
430 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
431 {
432   GstRTSPStreamPrivate *priv;
433   guint pt;
434
435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
436
437   priv = stream->priv;
438
439   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
440
441   return pt;
442 }
443
444 /**
445  * gst_rtsp_stream_get_srcpad:
446  * @stream: a #GstRTSPStream
447  *
448  * Get the srcpad associated with @stream.
449  *
450  * Returns: (transfer full): the srcpad. Unref after usage.
451  */
452 GstPad *
453 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
454 {
455   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
456
457   if (!stream->priv->srcpad)
458     return NULL;
459
460   return gst_object_ref (stream->priv->srcpad);
461 }
462
463 /**
464  * gst_rtsp_stream_get_sinkpad:
465  * @stream: a #GstRTSPStream
466  *
467  * Get the sinkpad associated with @stream.
468  *
469  * Returns: (transfer full): the sinkpad. Unref after usage.
470  */
471 GstPad *
472 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
473 {
474   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
475
476   if (!stream->priv->sinkpad)
477     return NULL;
478
479   return gst_object_ref (stream->priv->sinkpad);
480 }
481
482 /**
483  * gst_rtsp_stream_get_control:
484  * @stream: a #GstRTSPStream
485  *
486  * Get the control string to identify this stream.
487  *
488  * Returns: (transfer full): the control string. g_free() after usage.
489  */
490 gchar *
491 gst_rtsp_stream_get_control (GstRTSPStream * stream)
492 {
493   GstRTSPStreamPrivate *priv;
494   gchar *result;
495
496   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
497
498   priv = stream->priv;
499
500   g_mutex_lock (&priv->lock);
501   if ((result = g_strdup (priv->control)) == NULL)
502     result = g_strdup_printf ("stream=%u", priv->idx);
503   g_mutex_unlock (&priv->lock);
504
505   return result;
506 }
507
508 /**
509  * gst_rtsp_stream_set_control:
510  * @stream: a #GstRTSPStream
511  * @control: a control string
512  *
513  * Set the control string in @stream.
514  */
515 void
516 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
517 {
518   GstRTSPStreamPrivate *priv;
519
520   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
521
522   priv = stream->priv;
523
524   g_mutex_lock (&priv->lock);
525   g_free (priv->control);
526   priv->control = g_strdup (control);
527   g_mutex_unlock (&priv->lock);
528 }
529
530 /**
531  * gst_rtsp_stream_has_control:
532  * @stream: a #GstRTSPStream
533  * @control: a control string
534  *
535  * Check if @stream has the control string @control.
536  *
537  * Returns: %TRUE is @stream has @control as the control string
538  */
539 gboolean
540 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
541 {
542   GstRTSPStreamPrivate *priv;
543   gboolean res;
544
545   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
546
547   priv = stream->priv;
548
549   g_mutex_lock (&priv->lock);
550   if (priv->control)
551     res = (g_strcmp0 (priv->control, control) == 0);
552   else {
553     guint streamid;
554
555     if (sscanf (control, "stream=%u", &streamid) > 0)
556       res = (streamid == priv->idx);
557     else
558       res = FALSE;
559   }
560   g_mutex_unlock (&priv->lock);
561
562   return res;
563 }
564
565 /**
566  * gst_rtsp_stream_set_mtu:
567  * @stream: a #GstRTSPStream
568  * @mtu: a new MTU
569  *
570  * Configure the mtu in the payloader of @stream to @mtu.
571  */
572 void
573 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
574 {
575   GstRTSPStreamPrivate *priv;
576
577   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
578
579   priv = stream->priv;
580
581   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
582
583   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
584 }
585
586 /**
587  * gst_rtsp_stream_get_mtu:
588  * @stream: a #GstRTSPStream
589  *
590  * Get the configured MTU in the payloader of @stream.
591  *
592  * Returns: the MTU of the payloader.
593  */
594 guint
595 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
596 {
597   GstRTSPStreamPrivate *priv;
598   guint mtu;
599
600   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
601
602   priv = stream->priv;
603
604   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
605
606   return mtu;
607 }
608
609 /* Update the dscp qos property on the udp sinks */
610 static void
611 update_dscp_qos (GstRTSPStream * stream)
612 {
613   GstRTSPStreamPrivate *priv;
614
615   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
616
617   priv = stream->priv;
618
619   if (priv->udpsink[0]) {
620     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
621         NULL);
622   }
623
624   if (priv->udpsink[1]) {
625     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
626         NULL);
627   }
628 }
629
630 /**
631  * gst_rtsp_stream_set_dscp_qos:
632  * @stream: a #GstRTSPStream
633  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
634  *
635  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
636  */
637 void
638 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
639 {
640   GstRTSPStreamPrivate *priv;
641
642   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
643
644   priv = stream->priv;
645
646   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
647
648   if (dscp_qos < -1 || dscp_qos > 63) {
649     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
650     return;
651   }
652
653   priv->dscp_qos = dscp_qos;
654
655   update_dscp_qos (stream);
656 }
657
658 /**
659  * gst_rtsp_stream_get_dscp_qos:
660  * @stream: a #GstRTSPStream
661  *
662  * Get the configured DSCP QoS in of the outgoing sockets.
663  *
664  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
665  */
666 gint
667 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
668 {
669   GstRTSPStreamPrivate *priv;
670
671   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
672
673   priv = stream->priv;
674
675   return priv->dscp_qos;
676 }
677
678 /**
679  * gst_rtsp_stream_is_transport_supported:
680  * @stream: a #GstRTSPStream
681  * @transport: (transfer none): a #GstRTSPTransport
682  *
683  * Check if @transport can be handled by stream
684  *
685  * Returns: %TRUE if @transport can be handled by @stream.
686  */
687 gboolean
688 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
689     GstRTSPTransport * transport)
690 {
691   GstRTSPStreamPrivate *priv;
692
693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
694
695   priv = stream->priv;
696
697   g_mutex_lock (&priv->lock);
698   if (transport->trans != GST_RTSP_TRANS_RTP)
699     goto unsupported_transmode;
700
701   if (!(transport->profile & priv->profiles))
702     goto unsupported_profile;
703
704   if (!(transport->lower_transport & priv->protocols))
705     goto unsupported_ltrans;
706
707   g_mutex_unlock (&priv->lock);
708
709   return TRUE;
710
711   /* ERRORS */
712 unsupported_transmode:
713   {
714     GST_DEBUG ("unsupported transport mode %d", transport->trans);
715     g_mutex_unlock (&priv->lock);
716     return FALSE;
717   }
718 unsupported_profile:
719   {
720     GST_DEBUG ("unsupported profile %d", transport->profile);
721     g_mutex_unlock (&priv->lock);
722     return FALSE;
723   }
724 unsupported_ltrans:
725   {
726     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
727     g_mutex_unlock (&priv->lock);
728     return FALSE;
729   }
730 }
731
732 /**
733  * gst_rtsp_stream_set_profiles:
734  * @stream: a #GstRTSPStream
735  * @profiles: the new profiles
736  *
737  * Configure the allowed profiles for @stream.
738  */
739 void
740 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
741 {
742   GstRTSPStreamPrivate *priv;
743
744   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
745
746   priv = stream->priv;
747
748   g_mutex_lock (&priv->lock);
749   priv->profiles = profiles;
750   g_mutex_unlock (&priv->lock);
751 }
752
753 /**
754  * gst_rtsp_stream_get_profiles:
755  * @stream: a #GstRTSPStream
756  *
757  * Get the allowed profiles of @stream.
758  *
759  * Returns: a #GstRTSPProfile
760  */
761 GstRTSPProfile
762 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
763 {
764   GstRTSPStreamPrivate *priv;
765   GstRTSPProfile res;
766
767   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
768
769   priv = stream->priv;
770
771   g_mutex_lock (&priv->lock);
772   res = priv->profiles;
773   g_mutex_unlock (&priv->lock);
774
775   return res;
776 }
777
778 /**
779  * gst_rtsp_stream_set_protocols:
780  * @stream: a #GstRTSPStream
781  * @protocols: the new flags
782  *
783  * Configure the allowed lower transport for @stream.
784  */
785 void
786 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
787     GstRTSPLowerTrans protocols)
788 {
789   GstRTSPStreamPrivate *priv;
790
791   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
792
793   priv = stream->priv;
794
795   g_mutex_lock (&priv->lock);
796   priv->protocols = protocols;
797   g_mutex_unlock (&priv->lock);
798 }
799
800 /**
801  * gst_rtsp_stream_get_protocols:
802  * @stream: a #GstRTSPStream
803  *
804  * Get the allowed protocols of @stream.
805  *
806  * Returns: a #GstRTSPLowerTrans
807  */
808 GstRTSPLowerTrans
809 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
810 {
811   GstRTSPStreamPrivate *priv;
812   GstRTSPLowerTrans res;
813
814   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
815       GST_RTSP_LOWER_TRANS_UNKNOWN);
816
817   priv = stream->priv;
818
819   g_mutex_lock (&priv->lock);
820   res = priv->protocols;
821   g_mutex_unlock (&priv->lock);
822
823   return res;
824 }
825
826 /**
827  * gst_rtsp_stream_set_address_pool:
828  * @stream: a #GstRTSPStream
829  * @pool: (transfer none): a #GstRTSPAddressPool
830  *
831  * configure @pool to be used as the address pool of @stream.
832  */
833 void
834 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
835     GstRTSPAddressPool * pool)
836 {
837   GstRTSPStreamPrivate *priv;
838   GstRTSPAddressPool *old;
839
840   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
841
842   priv = stream->priv;
843
844   GST_LOG_OBJECT (stream, "set address pool %p", pool);
845
846   g_mutex_lock (&priv->lock);
847   if ((old = priv->pool) != pool)
848     priv->pool = pool ? g_object_ref (pool) : NULL;
849   else
850     old = NULL;
851   g_mutex_unlock (&priv->lock);
852
853   if (old)
854     g_object_unref (old);
855 }
856
857 /**
858  * gst_rtsp_stream_get_address_pool:
859  * @stream: a #GstRTSPStream
860  *
861  * Get the #GstRTSPAddressPool used as the address pool of @stream.
862  *
863  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
864  * usage.
865  */
866 GstRTSPAddressPool *
867 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
868 {
869   GstRTSPStreamPrivate *priv;
870   GstRTSPAddressPool *result;
871
872   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
873
874   priv = stream->priv;
875
876   g_mutex_lock (&priv->lock);
877   if ((result = priv->pool))
878     g_object_ref (result);
879   g_mutex_unlock (&priv->lock);
880
881   return result;
882 }
883
884 /**
885  * gst_rtsp_stream_set_multicast_iface:
886  * @stream: a #GstRTSPStream
887  * @multicast_iface: (transfer none): a multicast interface
888  *
889  * configure @multicast_iface to be used for @stream.
890  */
891 void
892 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
893     const gchar * multicast_iface)
894 {
895   GstRTSPStreamPrivate *priv;
896   gchar *old;
897
898   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
899
900   priv = stream->priv;
901
902   GST_LOG_OBJECT (stream, "set multicast iface %s",
903       GST_STR_NULL (multicast_iface));
904
905   g_mutex_lock (&priv->lock);
906   if ((old = priv->multicast_iface) != multicast_iface)
907     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
908   else
909     old = NULL;
910   g_mutex_unlock (&priv->lock);
911
912   if (old)
913     g_free (old);
914 }
915
916 /**
917  * gst_rtsp_stream_get_multicast_iface:
918  * @stream: a #GstRTSPStream
919  *
920  * Get the multicast interface used for @stream.
921  *
922  * Returns: (transfer full): the multicast interface for @stream. g_free() after
923  * usage.
924  */
925 gchar *
926 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
927 {
928   GstRTSPStreamPrivate *priv;
929   gchar *result;
930
931   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
932
933   priv = stream->priv;
934
935   g_mutex_lock (&priv->lock);
936   if ((result = priv->multicast_iface))
937     result = g_strdup (result);
938   g_mutex_unlock (&priv->lock);
939
940   return result;
941 }
942
943 /**
944  * gst_rtsp_stream_get_multicast_address:
945  * @stream: a #GstRTSPStream
946  * @family: the #GSocketFamily
947  *
948  * Get the multicast address of @stream for @family.
949  *
950  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
951  * or %NULL when no address could be allocated. gst_rtsp_address_free()
952  * after usage.
953  */
954 GstRTSPAddress *
955 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
956     GSocketFamily family)
957 {
958   GstRTSPStreamPrivate *priv;
959   GstRTSPAddress *result;
960   GstRTSPAddress **addrp;
961   GstRTSPAddressFlags flags;
962
963   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
964
965   priv = stream->priv;
966
967   if (family == G_SOCKET_FAMILY_IPV6) {
968     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
969     addrp = &priv->addr_v6;
970   } else {
971     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
972     addrp = &priv->addr_v4;
973   }
974
975   g_mutex_lock (&priv->lock);
976   if (*addrp == NULL) {
977     if (priv->pool == NULL)
978       goto no_pool;
979
980     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
981
982     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
983     if (*addrp == NULL)
984       goto no_address;
985   }
986   result = gst_rtsp_address_copy (*addrp);
987   g_mutex_unlock (&priv->lock);
988
989   return result;
990
991   /* ERRORS */
992 no_pool:
993   {
994     GST_ERROR_OBJECT (stream, "no address pool specified");
995     g_mutex_unlock (&priv->lock);
996     return NULL;
997   }
998 no_address:
999   {
1000     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1001     g_mutex_unlock (&priv->lock);
1002     return NULL;
1003   }
1004 }
1005
1006 /**
1007  * gst_rtsp_stream_reserve_address:
1008  * @stream: a #GstRTSPStream
1009  * @address: an address
1010  * @port: a port
1011  * @n_ports: n_ports
1012  * @ttl: a TTL
1013  *
1014  * Reserve @address and @port as the address and port of @stream.
1015  *
1016  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1017  * the address could be reserved. gst_rtsp_address_free() after usage.
1018  */
1019 GstRTSPAddress *
1020 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1021     const gchar * address, guint port, guint n_ports, guint ttl)
1022 {
1023   GstRTSPStreamPrivate *priv;
1024   GstRTSPAddress *result;
1025   GInetAddress *addr;
1026   GSocketFamily family;
1027   GstRTSPAddress **addrp;
1028
1029   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1030   g_return_val_if_fail (address != NULL, NULL);
1031   g_return_val_if_fail (port > 0, NULL);
1032   g_return_val_if_fail (n_ports > 0, NULL);
1033   g_return_val_if_fail (ttl > 0, NULL);
1034
1035   priv = stream->priv;
1036
1037   addr = g_inet_address_new_from_string (address);
1038   if (!addr) {
1039     GST_ERROR ("failed to get inet addr from %s", address);
1040     family = G_SOCKET_FAMILY_IPV4;
1041   } else {
1042     family = g_inet_address_get_family (addr);
1043     g_object_unref (addr);
1044   }
1045
1046   if (family == G_SOCKET_FAMILY_IPV6)
1047     addrp = &priv->addr_v6;
1048   else
1049     addrp = &priv->addr_v4;
1050
1051   g_mutex_lock (&priv->lock);
1052   if (*addrp == NULL) {
1053     GstRTSPAddressPoolResult res;
1054
1055     if (priv->pool == NULL)
1056       goto no_pool;
1057
1058     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1059         port, n_ports, ttl, addrp);
1060     if (res != GST_RTSP_ADDRESS_POOL_OK)
1061       goto no_address;
1062   } else {
1063     if (strcmp ((*addrp)->address, address) ||
1064         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1065         (*addrp)->ttl != ttl)
1066       goto different_address;
1067   }
1068   result = gst_rtsp_address_copy (*addrp);
1069   g_mutex_unlock (&priv->lock);
1070
1071   return result;
1072
1073   /* ERRORS */
1074 no_pool:
1075   {
1076     GST_ERROR_OBJECT (stream, "no address pool specified");
1077     g_mutex_unlock (&priv->lock);
1078     return NULL;
1079   }
1080 no_address:
1081   {
1082     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1083         address);
1084     g_mutex_unlock (&priv->lock);
1085     return NULL;
1086   }
1087 different_address:
1088   {
1089     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1090         " reserved", address);
1091     g_mutex_unlock (&priv->lock);
1092     return NULL;
1093   }
1094 }
1095
1096 /* must be called with lock */
1097 static void
1098 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1099     GSocket * rtcp_socket, GSocketFamily family)
1100 {
1101   GstRTSPStreamPrivate *priv = stream->priv;
1102   const gchar *multisink_socket;
1103
1104   if (family == G_SOCKET_FAMILY_IPV6)
1105     multisink_socket = "socket-v6";
1106   else
1107     multisink_socket = "socket";
1108
1109   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1110       NULL);
1111   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1112       NULL);
1113 }
1114
1115 /* must be called with lock */
1116 static gboolean
1117 create_and_configure_udpsinks (GstRTSPStream * stream)
1118 {
1119   GstRTSPStreamPrivate *priv = stream->priv;
1120   GstElement *udpsink0, *udpsink1;
1121
1122   udpsink0 = NULL;
1123   udpsink1 = NULL;
1124
1125   if (priv->udpsink[0])
1126     udpsink0 = priv->udpsink[0];
1127   else
1128     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1129
1130   if (!udpsink0)
1131     goto no_udp_protocol;
1132
1133   if (priv->udpsink[1])
1134     udpsink1 = priv->udpsink[1];
1135   else
1136     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1137
1138   if (!udpsink1)
1139     goto no_udp_protocol;
1140
1141   /* configure sinks */
1142
1143   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1144   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1145
1146   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1147   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1148
1149   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1150
1151   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1152   /* Needs to be async for RECORD streams, otherwise we will never go to
1153    * PLAYING because the sinks will wait for data while the udpsrc can't
1154    * provide data with timestamps in PAUSED. */
1155   if (priv->sinkpad)
1156     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1157   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1158
1159   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1160   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1161
1162   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1163   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1164
1165   /* update the dscp qos field in the sinks */
1166   update_dscp_qos (stream);
1167
1168   priv->udpsink[0] = udpsink0;
1169   priv->udpsink[1] = udpsink1;
1170
1171   return TRUE;
1172
1173   /* ERRORS */
1174 no_udp_protocol:
1175   {
1176     return FALSE;
1177   }
1178 }
1179
1180 /* must be called with lock */
1181 static void
1182 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1183     GSocketFamily family)
1184 {
1185   GstRTSPStreamPrivate *priv;
1186   GstPad *pad, *selpad;
1187   guint i;
1188   GstBin *bin;
1189
1190   priv = stream->priv;
1191   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1192
1193   for (i = 0; i < 2; i++) {
1194     if (priv->sinkpad || i == 1) {
1195       if (priv->srcpad) {
1196         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1197          * values. This is only relevant for PLAY pipelines */
1198         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1199         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1200       }
1201       /* add udpsrc */
1202       gst_bin_add (bin, udpsrc_out[i]);
1203
1204       /* and link to the funnel */
1205       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1206       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1207       gst_pad_link (pad, selpad);
1208       gst_object_unref (pad);
1209       gst_object_unref (selpad);
1210
1211       /* otherwise sync state with parent in case it's running already
1212        * at this point */
1213       if (!priv->srcpad) {
1214         gst_element_sync_state_with_parent (udpsrc_out[i]);
1215       }
1216     }
1217   }
1218
1219   gst_object_unref (bin);
1220 }
1221
1222 /* must be called with lock */
1223 static gboolean
1224 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1225     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1226     const gchar * address, gint rtpport, gint rtcpport,
1227     const gchar * multicast_iface, GstRTSPLowerTrans transport)
1228 {
1229   GstStateChangeReturn ret;
1230
1231   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1232   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1233
1234   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1235     goto error;
1236
1237   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1238     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1239     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1240     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1241     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1242     g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface,
1243         NULL);
1244     g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface,
1245         NULL);
1246     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1247     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1248   }
1249
1250   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1251   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1252
1253   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1254   if (ret == GST_STATE_CHANGE_FAILURE)
1255     goto error;
1256   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1257   if (ret == GST_STATE_CHANGE_FAILURE)
1258     goto error;
1259
1260   return TRUE;
1261   return TRUE;
1262
1263   /* ERRORS */
1264 error:
1265   {
1266     if (udpsrc_out[0])
1267       gst_object_unref (udpsrc_out[0]);
1268     if (udpsrc_out[1])
1269       gst_object_unref (udpsrc_out[1]);
1270     return FALSE;
1271   }
1272 }
1273
1274 static gboolean
1275 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1276     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1277     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1278     gboolean use_client_settings)
1279 {
1280   GstRTSPStreamPrivate *priv = stream->priv;
1281   GSocket *rtp_socket = NULL;
1282   GSocket *rtcp_socket;
1283   gint tmp_rtp, tmp_rtcp;
1284   guint count;
1285   gint rtpport, rtcpport;
1286   GList *rejected_addresses = NULL;
1287   GstRTSPAddress *addr = NULL;
1288   GInetAddress *inetaddr = NULL;
1289   gchar *addr_str;
1290   GSocketAddress *rtp_sockaddr = NULL;
1291   GSocketAddress *rtcp_sockaddr = NULL;
1292   GstRTSPAddressPool *pool;
1293   GstRTSPLowerTrans transport;
1294   const gchar *multicast_iface = priv->multicast_iface;
1295
1296   pool = priv->pool;
1297   count = 0;
1298   transport = ct->lower_transport;
1299
1300   /* Start with random port */
1301   tmp_rtp = 0;
1302
1303   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1304       G_SOCKET_PROTOCOL_UDP, NULL);
1305   if (!rtcp_socket)
1306     goto no_udp_protocol;
1307   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1308
1309   if (*server_addr_out)
1310     gst_rtsp_address_free (*server_addr_out);
1311
1312   /* try to allocate 2 UDP ports, the RTP port should be an even
1313    * number and the RTCP port should be the next (uneven) port */
1314 again:
1315
1316   if (rtp_socket == NULL) {
1317     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1318         G_SOCKET_PROTOCOL_UDP, NULL);
1319     if (!rtp_socket)
1320       goto no_udp_protocol;
1321     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1322   }
1323
1324   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1325               gst_rtsp_address_pool_has_unicast_addresses (pool))
1326           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1327     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1328
1329     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1330       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1331     else
1332       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1333
1334     if (addr)
1335       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1336
1337     if (family == G_SOCKET_FAMILY_IPV6)
1338       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1339     else
1340       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1341
1342     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1343         && use_client_settings)
1344       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1345           ct->port.min, 2, ct->ttl, &addr);
1346     else
1347       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1348
1349     if (addr == NULL)
1350       goto no_ports;
1351
1352     tmp_rtp = addr->port;
1353
1354     g_clear_object (&inetaddr);
1355     inetaddr = g_inet_address_new_from_string (addr->address);
1356
1357     /* If we're supposed to bind to a multicast address, instead bind
1358      * to ANY and let udpsrc later join the relevant multicast group
1359      */
1360     if (g_inet_address_get_is_multicast (inetaddr)) {
1361       g_object_unref (inetaddr);
1362       inetaddr = g_inet_address_new_any (family);
1363     }
1364   } else {
1365     if (tmp_rtp != 0) {
1366       tmp_rtp += 2;
1367       if (++count > 20)
1368         goto no_ports;
1369     }
1370
1371     if (inetaddr == NULL)
1372       inetaddr = g_inet_address_new_any (family);
1373   }
1374
1375   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1376   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1377     g_object_unref (rtp_sockaddr);
1378     goto again;
1379   }
1380   g_object_unref (rtp_sockaddr);
1381
1382   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1383   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1384     g_clear_object (&rtp_sockaddr);
1385     goto socket_error;
1386   }
1387
1388   tmp_rtp =
1389       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1390   g_object_unref (rtp_sockaddr);
1391
1392   /* check if port is even */
1393   if ((tmp_rtp & 1) != 0) {
1394     /* port not even, close and allocate another */
1395     tmp_rtp++;
1396     g_clear_object (&rtp_socket);
1397     goto again;
1398   }
1399
1400   /* set port */
1401   tmp_rtcp = tmp_rtp + 1;
1402
1403   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1404   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1405     g_object_unref (rtcp_sockaddr);
1406     g_clear_object (&rtp_socket);
1407     goto again;
1408   }
1409   g_object_unref (rtcp_sockaddr);
1410
1411   if (addr == NULL)
1412     addr_str = g_inet_address_to_string (inetaddr);
1413   else
1414     addr_str = addr->address;
1415   g_clear_object (&inetaddr);
1416
1417   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1418           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface,
1419           transport)) {
1420     if (addr == NULL)
1421       g_free (addr_str);
1422     goto no_udp_protocol;
1423   }
1424
1425   if (addr == NULL)
1426     g_free (addr_str);
1427
1428   play_udpsources_one_family (stream, udpsrc_out, family);
1429
1430   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1431   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1432
1433   /* this should not happen... */
1434   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1435     goto port_error;
1436
1437   /* set RTP and RTCP sockets */
1438   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1439
1440   server_port_out->min = rtpport;
1441   server_port_out->max = rtcpport;
1442
1443   *server_addr_out = addr;
1444   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1445
1446   g_object_unref (rtp_socket);
1447   g_object_unref (rtcp_socket);
1448
1449   return TRUE;
1450
1451   /* ERRORS */
1452 no_udp_protocol:
1453   {
1454     goto cleanup;
1455   }
1456 no_ports:
1457   {
1458     goto cleanup;
1459   }
1460 port_error:
1461   {
1462     goto cleanup;
1463   }
1464 socket_error:
1465   {
1466     goto cleanup;
1467   }
1468 cleanup:
1469   {
1470     if (inetaddr)
1471       g_object_unref (inetaddr);
1472     g_list_free_full (rejected_addresses,
1473         (GDestroyNotify) gst_rtsp_address_free);
1474     if (addr)
1475       gst_rtsp_address_free (addr);
1476     if (rtp_socket)
1477       g_object_unref (rtp_socket);
1478     if (rtcp_socket)
1479       g_object_unref (rtcp_socket);
1480     return FALSE;
1481   }
1482 }
1483
1484 /**
1485  * gst_rtsp_stream_allocate_udp_sockets:
1486  * @stream: a #GstRTSPStream
1487  * @family: protocol family
1488  * @transport_method: transport method
1489  *
1490  * Allocates RTP and RTCP ports.
1491  *
1492  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1493  */
1494 gboolean
1495 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1496     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1497 {
1498   GstRTSPStreamPrivate *priv;
1499   gboolean result = FALSE;
1500   GstRTSPLowerTrans transport = ct->lower_transport;
1501
1502   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1503   priv = stream->priv;
1504   g_return_val_if_fail (priv->is_joined, FALSE);
1505
1506   g_mutex_lock (&priv->lock);
1507
1508   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1509     if (family == G_SOCKET_FAMILY_IPV4) {
1510       /* Multicast IPV4 */
1511       if (priv->have_ipv4_mcast) {
1512         result = TRUE;
1513         goto done;
1514       }
1515
1516       priv->have_ipv4_mcast =
1517           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1518           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
1519           use_client_settings);
1520       result = priv->have_ipv4_mcast;
1521
1522     } else {
1523       /* Multicast IPV6 */
1524       if (priv->have_ipv6_mcast) {
1525         result = TRUE;
1526         goto done;
1527       }
1528
1529       priv->have_ipv6_mcast =
1530           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1531           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
1532           use_client_settings);
1533       result = priv->have_ipv6_mcast;
1534     }
1535   } else {
1536     /* We allow multiple unicast transports, so we must maintain a table of the 
1537      * udpsrcs created for them. */
1538     GstRTSPStreamUDPSrcs *transport_udpsrcs =
1539         g_slice_new0 (GstRTSPStreamUDPSrcs);
1540
1541     if (family == G_SOCKET_FAMILY_IPV4) {
1542       /* Unicast IPV4 */
1543       result =
1544           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1545           transport_udpsrcs->udpsrc, &priv->server_port_v4, ct,
1546           &priv->server_addr_v4, use_client_settings);
1547     } else {
1548       /* Unicast IPV6 */
1549       result =
1550           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1551           transport_udpsrcs->udpsrc, &priv->server_port_v6, ct,
1552           &priv->server_addr_v6, use_client_settings);
1553     }
1554
1555     /* If we didn't create any unicast udpsrcs, free the transport_udpsrcs struct. 
1556      * Otherwise, add it to the hash table */
1557     if (transport_udpsrcs->udpsrc[0] == NULL
1558         && transport_udpsrcs->udpsrc[1] == NULL)
1559       g_slice_free (GstRTSPStreamUDPSrcs, transport_udpsrcs);
1560     else
1561       g_hash_table_insert (priv->udpsrcs, ct, transport_udpsrcs);
1562   }
1563
1564 done:
1565   g_mutex_unlock (&priv->lock);
1566
1567   return result;
1568 }
1569
1570 /**
1571  * gst_rtsp_stream_set_client_side:
1572  * @stream: a #GstRTSPStream
1573  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1574  * an RTSP connection.
1575  *
1576  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1577  * streams to an RTSP server via RECORD. This has the practical effect
1578  * of changing which UDP port numbers are used when setting up the local
1579  * side of the stream sending to be either the 'server' or 'client' pair
1580  * of a configured UDP transport.
1581  */
1582 void
1583 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1584 {
1585   GstRTSPStreamPrivate *priv;
1586
1587   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1588   priv = stream->priv;
1589   g_mutex_lock (&priv->lock);
1590   priv->client_side = client_side;
1591   g_mutex_unlock (&priv->lock);
1592 }
1593
1594 /**
1595  * gst_rtsp_stream_is_client_side:
1596  * @stream: a #GstRTSPStream
1597  *
1598  * See gst_rtsp_stream_set_client_side()
1599  *
1600  * Returns: TRUE if this #GstRTSPStream is client-side.
1601  */
1602 gboolean
1603 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1604 {
1605   GstRTSPStreamPrivate *priv;
1606   gboolean ret;
1607
1608   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1609
1610   priv = stream->priv;
1611   g_mutex_lock (&priv->lock);
1612   ret = priv->client_side;
1613   g_mutex_unlock (&priv->lock);
1614
1615   return ret;
1616 }
1617
1618 /**
1619  * gst_rtsp_stream_get_server_port:
1620  * @stream: a #GstRTSPStream
1621  * @server_port: (out): result server port
1622  * @family: the port family to get
1623  *
1624  * Fill @server_port with the port pair used by the server. This function can
1625  * only be called when @stream has been joined.
1626  */
1627 void
1628 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1629     GstRTSPRange * server_port, GSocketFamily family)
1630 {
1631   GstRTSPStreamPrivate *priv;
1632
1633   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1634   priv = stream->priv;
1635   g_return_if_fail (priv->is_joined);
1636
1637   g_mutex_lock (&priv->lock);
1638   if (family == G_SOCKET_FAMILY_IPV4) {
1639     if (server_port)
1640       *server_port = priv->server_port_v4;
1641   } else {
1642     if (server_port)
1643       *server_port = priv->server_port_v6;
1644   }
1645   g_mutex_unlock (&priv->lock);
1646 }
1647
1648 /**
1649  * gst_rtsp_stream_get_rtpsession:
1650  * @stream: a #GstRTSPStream
1651  *
1652  * Get the RTP session of this stream.
1653  *
1654  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1655  */
1656 GObject *
1657 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1658 {
1659   GstRTSPStreamPrivate *priv;
1660   GObject *session;
1661
1662   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1663
1664   priv = stream->priv;
1665
1666   g_mutex_lock (&priv->lock);
1667   if ((session = priv->session))
1668     g_object_ref (session);
1669   g_mutex_unlock (&priv->lock);
1670
1671   return session;
1672 }
1673
1674 /**
1675  * gst_rtsp_stream_get_encoder:
1676  * @stream: a #GstRTSPStream
1677  *
1678  * Get the SRTP encoder for this stream.
1679  *
1680  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1681  */
1682 GstElement *
1683 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1684 {
1685   GstRTSPStreamPrivate *priv;
1686   GstElement *encoder;
1687
1688   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1689
1690   priv = stream->priv;
1691
1692   g_mutex_lock (&priv->lock);
1693   if ((encoder = priv->srtpenc))
1694     g_object_ref (encoder);
1695   g_mutex_unlock (&priv->lock);
1696
1697   return encoder;
1698 }
1699
1700 /**
1701  * gst_rtsp_stream_get_ssrc:
1702  * @stream: a #GstRTSPStream
1703  * @ssrc: (out): result ssrc
1704  *
1705  * Get the SSRC used by the RTP session of this stream. This function can only
1706  * be called when @stream has been joined.
1707  */
1708 void
1709 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1710 {
1711   GstRTSPStreamPrivate *priv;
1712
1713   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1714   priv = stream->priv;
1715   g_return_if_fail (priv->is_joined);
1716
1717   g_mutex_lock (&priv->lock);
1718   if (ssrc && priv->session)
1719     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1720   g_mutex_unlock (&priv->lock);
1721 }
1722
1723 /**
1724  * gst_rtsp_stream_set_retransmission_time:
1725  * @stream: a #GstRTSPStream
1726  * @time: a #GstClockTime
1727  *
1728  * Set the amount of time to store retransmission packets.
1729  */
1730 void
1731 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1732     GstClockTime time)
1733 {
1734   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1735
1736   g_mutex_lock (&stream->priv->lock);
1737   stream->priv->rtx_time = time;
1738   if (stream->priv->rtxsend)
1739     g_object_set (stream->priv->rtxsend, "max-size-time",
1740         GST_TIME_AS_MSECONDS (time), NULL);
1741   g_mutex_unlock (&stream->priv->lock);
1742 }
1743
1744 /**
1745  * gst_rtsp_stream_get_retransmission_time:
1746  * @stream: a #GstRTSPStream
1747  *
1748  * Get the amount of time to store retransmission data.
1749  *
1750  * Returns: the amount of time to store retransmission data.
1751  */
1752 GstClockTime
1753 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1754 {
1755   GstClockTime ret;
1756
1757   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1758
1759   g_mutex_lock (&stream->priv->lock);
1760   ret = stream->priv->rtx_time;
1761   g_mutex_unlock (&stream->priv->lock);
1762
1763   return ret;
1764 }
1765
1766 /**
1767  * gst_rtsp_stream_set_retransmission_pt:
1768  * @stream: a #GstRTSPStream
1769  * @rtx_pt: a #guint
1770  *
1771  * Set the payload type (pt) for retransmission of this stream.
1772  */
1773 void
1774 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1775 {
1776   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1777
1778   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1779
1780   g_mutex_lock (&stream->priv->lock);
1781   stream->priv->rtx_pt = rtx_pt;
1782   if (stream->priv->rtxsend) {
1783     guint pt = gst_rtsp_stream_get_pt (stream);
1784     gchar *pt_s = g_strdup_printf ("%d", pt);
1785     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1786         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1787     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1788     g_free (pt_s);
1789     gst_structure_free (rtx_pt_map);
1790   }
1791   g_mutex_unlock (&stream->priv->lock);
1792 }
1793
1794 /**
1795  * gst_rtsp_stream_get_retransmission_pt:
1796  * @stream: a #GstRTSPStream
1797  *
1798  * Get the payload-type used for retransmission of this stream
1799  *
1800  * Returns: The retransmission PT.
1801  */
1802 guint
1803 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1804 {
1805   guint rtx_pt;
1806
1807   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1808
1809   g_mutex_lock (&stream->priv->lock);
1810   rtx_pt = stream->priv->rtx_pt;
1811   g_mutex_unlock (&stream->priv->lock);
1812
1813   return rtx_pt;
1814 }
1815
1816 /**
1817  * gst_rtsp_stream_set_buffer_size:
1818  * @stream: a #GstRTSPStream
1819  * @size: the buffer size
1820  *
1821  * Set the size of the UDP transmission buffer (in bytes)
1822  * Needs to be set before the stream is joined to a bin.
1823  *
1824  * Since: 1.6
1825  */
1826 void
1827 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1828 {
1829   g_mutex_lock (&stream->priv->lock);
1830   stream->priv->buffer_size = size;
1831   g_mutex_unlock (&stream->priv->lock);
1832 }
1833
1834 /**
1835  * gst_rtsp_stream_get_buffer_size:
1836  * @stream: a #GstRTSPStream
1837  *
1838  * Get the size of the UDP transmission buffer (in bytes)
1839  *
1840  * Returns: the size of the UDP TX buffer
1841  *
1842  * Since: 1.6
1843  */
1844 guint
1845 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1846 {
1847   guint buffer_size;
1848
1849   g_mutex_lock (&stream->priv->lock);
1850   buffer_size = stream->priv->buffer_size;
1851   g_mutex_unlock (&stream->priv->lock);
1852
1853   return buffer_size;
1854 }
1855
1856 /* executed from streaming thread */
1857 static void
1858 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1859 {
1860   GstRTSPStreamPrivate *priv = stream->priv;
1861   GstCaps *newcaps, *oldcaps;
1862
1863   newcaps = gst_pad_get_current_caps (pad);
1864
1865   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1866       newcaps);
1867
1868   g_mutex_lock (&priv->lock);
1869   oldcaps = priv->caps;
1870   priv->caps = newcaps;
1871   g_mutex_unlock (&priv->lock);
1872
1873   if (oldcaps)
1874     gst_caps_unref (oldcaps);
1875 }
1876
1877 static void
1878 dump_structure (const GstStructure * s)
1879 {
1880   gchar *sstr;
1881
1882   sstr = gst_structure_to_string (s);
1883   GST_INFO ("structure: %s", sstr);
1884   g_free (sstr);
1885 }
1886
1887 static GstRTSPStreamTransport *
1888 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1889 {
1890   GstRTSPStreamPrivate *priv = stream->priv;
1891   GList *walk;
1892   GstRTSPStreamTransport *result = NULL;
1893   const gchar *tmp;
1894   gchar *dest;
1895   guint port;
1896
1897   if (rtcp_from == NULL)
1898     return NULL;
1899
1900   tmp = g_strrstr (rtcp_from, ":");
1901   if (tmp == NULL)
1902     return NULL;
1903
1904   port = atoi (tmp + 1);
1905   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1906
1907   g_mutex_lock (&priv->lock);
1908   GST_INFO ("finding %s:%d in %d transports", dest, port,
1909       g_list_length (priv->transports));
1910
1911   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1912     GstRTSPStreamTransport *trans = walk->data;
1913     const GstRTSPTransport *tr;
1914     gint min, max;
1915
1916     tr = gst_rtsp_stream_transport_get_transport (trans);
1917
1918     if (priv->client_side) {
1919       /* In client side mode the 'destination' is the RTSP server, so send
1920        * to those ports */
1921       min = tr->server_port.min;
1922       max = tr->server_port.max;
1923     } else {
1924       min = tr->client_port.min;
1925       max = tr->client_port.max;
1926     }
1927
1928     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1929       result = trans;
1930       break;
1931     }
1932   }
1933   if (result)
1934     g_object_ref (result);
1935   g_mutex_unlock (&priv->lock);
1936
1937   g_free (dest);
1938
1939   return result;
1940 }
1941
1942 static GstRTSPStreamTransport *
1943 check_transport (GObject * source, GstRTSPStream * stream)
1944 {
1945   GstStructure *stats;
1946   GstRTSPStreamTransport *trans;
1947
1948   /* see if we have a stream to match with the origin of the RTCP packet */
1949   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1950   if (trans == NULL) {
1951     g_object_get (source, "stats", &stats, NULL);
1952     if (stats) {
1953       const gchar *rtcp_from;
1954
1955       dump_structure (stats);
1956
1957       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1958       if ((trans = find_transport (stream, rtcp_from))) {
1959         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1960             source);
1961         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1962             g_object_unref);
1963       }
1964       gst_structure_free (stats);
1965     }
1966   }
1967   return trans;
1968 }
1969
1970
1971 static void
1972 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1973 {
1974   GstRTSPStreamTransport *trans;
1975
1976   GST_INFO ("%p: new source %p", stream, source);
1977
1978   trans = check_transport (source, stream);
1979
1980   if (trans)
1981     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1982 }
1983
1984 static void
1985 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1986 {
1987   GST_INFO ("%p: new SDES %p", stream, source);
1988 }
1989
1990 static void
1991 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1992 {
1993   GstRTSPStreamTransport *trans;
1994
1995   trans = check_transport (source, stream);
1996
1997   if (trans) {
1998     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1999     gst_rtsp_stream_transport_keep_alive (trans);
2000   }
2001 #ifdef DUMP_STATS
2002   {
2003     GstStructure *stats;
2004     g_object_get (source, "stats", &stats, NULL);
2005     if (stats) {
2006       dump_structure (stats);
2007       gst_structure_free (stats);
2008     }
2009   }
2010 #endif
2011 }
2012
2013 static void
2014 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2015 {
2016   GST_INFO ("%p: source %p bye", stream, source);
2017 }
2018
2019 static void
2020 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2021 {
2022   GstRTSPStreamTransport *trans;
2023
2024   GST_INFO ("%p: source %p bye timeout", stream, source);
2025
2026   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2027     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2028     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2029   }
2030 }
2031
2032 static void
2033 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2034 {
2035   GstRTSPStreamTransport *trans;
2036
2037   GST_INFO ("%p: source %p timeout", stream, source);
2038
2039   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2040     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2041     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2042   }
2043 }
2044
2045 static void
2046 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2047 {
2048   GST_INFO ("%p: new sender source %p", stream, source);
2049 #ifndef DUMP_STATS
2050   {
2051     GstStructure *stats;
2052     g_object_get (source, "stats", &stats, NULL);
2053     if (stats) {
2054       dump_structure (stats);
2055       gst_structure_free (stats);
2056     }
2057   }
2058 #endif
2059 }
2060
2061 static void
2062 on_sender_ssrc_active (GObject * session, GObject * source,
2063     GstRTSPStream * stream)
2064 {
2065 #ifndef DUMP_STATS
2066   {
2067     GstStructure *stats;
2068     g_object_get (source, "stats", &stats, NULL);
2069     if (stats) {
2070       dump_structure (stats);
2071       gst_structure_free (stats);
2072     }
2073   }
2074 #endif
2075 }
2076
2077 static void
2078 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2079 {
2080   if (is_rtp) {
2081     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2082     g_list_free (priv->tr_cache_rtp);
2083     priv->tr_cache_rtp = NULL;
2084   } else {
2085     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2086     g_list_free (priv->tr_cache_rtcp);
2087     priv->tr_cache_rtcp = NULL;
2088   }
2089 }
2090
2091 static GstFlowReturn
2092 handle_new_sample (GstAppSink * sink, gpointer user_data)
2093 {
2094   GstRTSPStreamPrivate *priv;
2095   GList *walk;
2096   GstSample *sample;
2097   GstBuffer *buffer;
2098   GstRTSPStream *stream;
2099   gboolean is_rtp;
2100
2101   sample = gst_app_sink_pull_sample (sink);
2102   if (!sample)
2103     return GST_FLOW_OK;
2104
2105   stream = (GstRTSPStream *) user_data;
2106   priv = stream->priv;
2107   buffer = gst_sample_get_buffer (sample);
2108
2109   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2110
2111   g_mutex_lock (&priv->lock);
2112   if (is_rtp) {
2113     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2114       clear_tr_cache (priv, is_rtp);
2115       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2116         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2117         priv->tr_cache_rtp =
2118             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2119       }
2120       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2121     }
2122   } else {
2123     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2124       clear_tr_cache (priv, is_rtp);
2125       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2126         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2127         priv->tr_cache_rtcp =
2128             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2129       }
2130       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2131     }
2132   }
2133   g_mutex_unlock (&priv->lock);
2134
2135   if (is_rtp) {
2136     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2137       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2138       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2139     }
2140   } else {
2141     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2142       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2143       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2144     }
2145   }
2146   gst_sample_unref (sample);
2147
2148   return GST_FLOW_OK;
2149 }
2150
2151 static GstAppSinkCallbacks sink_cb = {
2152   NULL,                         /* not interested in EOS */
2153   NULL,                         /* not interested in preroll samples */
2154   handle_new_sample,
2155 };
2156
2157 static GstElement *
2158 get_rtp_encoder (GstRTSPStream * stream, guint session)
2159 {
2160   GstRTSPStreamPrivate *priv = stream->priv;
2161
2162   if (priv->srtpenc == NULL) {
2163     gchar *name;
2164
2165     name = g_strdup_printf ("srtpenc_%u", session);
2166     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2167     g_free (name);
2168
2169     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2170   }
2171   return gst_object_ref (priv->srtpenc);
2172 }
2173
2174 static GstElement *
2175 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2176 {
2177   GstRTSPStreamPrivate *priv = stream->priv;
2178   GstElement *oldenc, *enc;
2179   GstPad *pad;
2180   gchar *name;
2181
2182   if (priv->idx != session)
2183     return NULL;
2184
2185   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2186
2187   oldenc = priv->srtpenc;
2188   enc = get_rtp_encoder (stream, session);
2189   name = g_strdup_printf ("rtp_sink_%d", session);
2190   pad = gst_element_get_request_pad (enc, name);
2191   g_free (name);
2192   gst_object_unref (pad);
2193
2194   if (oldenc == NULL)
2195     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2196         enc);
2197
2198   return enc;
2199 }
2200
2201 static GstElement *
2202 request_rtcp_encoder (GstElement * rtpbin, guint session,
2203     GstRTSPStream * stream)
2204 {
2205   GstRTSPStreamPrivate *priv = stream->priv;
2206   GstElement *oldenc, *enc;
2207   GstPad *pad;
2208   gchar *name;
2209
2210   if (priv->idx != session)
2211     return NULL;
2212
2213   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2214
2215   oldenc = priv->srtpenc;
2216   enc = get_rtp_encoder (stream, session);
2217   name = g_strdup_printf ("rtcp_sink_%d", session);
2218   pad = gst_element_get_request_pad (enc, name);
2219   g_free (name);
2220   gst_object_unref (pad);
2221
2222   if (oldenc == NULL)
2223     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2224         enc);
2225
2226   return enc;
2227 }
2228
2229 static GstCaps *
2230 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2231 {
2232   GstRTSPStreamPrivate *priv = stream->priv;
2233   GstCaps *caps;
2234
2235   GST_DEBUG ("request key %08x", ssrc);
2236
2237   g_mutex_lock (&priv->lock);
2238   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2239     gst_caps_ref (caps);
2240   g_mutex_unlock (&priv->lock);
2241
2242   return caps;
2243 }
2244
2245 static GstElement *
2246 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2247     GstRTSPStream * stream)
2248 {
2249   GstRTSPStreamPrivate *priv = stream->priv;
2250
2251   if (priv->idx != session)
2252     return NULL;
2253
2254   if (priv->srtpdec == NULL) {
2255     gchar *name;
2256
2257     name = g_strdup_printf ("srtpdec_%u", session);
2258     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2259     g_free (name);
2260
2261     g_signal_connect (priv->srtpdec, "request-key",
2262         (GCallback) request_key, stream);
2263   }
2264   return gst_object_ref (priv->srtpdec);
2265 }
2266
2267 /**
2268  * gst_rtsp_stream_request_aux_sender:
2269  * @stream: a #GstRTSPStream
2270  * @sessid: the session id
2271  *
2272  * Creating a rtxsend bin
2273  *
2274  * Returns: (transfer full): a #GstElement.
2275  *
2276  * Since: 1.6
2277  */
2278 GstElement *
2279 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2280 {
2281   GstElement *bin;
2282   GstPad *pad;
2283   GstStructure *pt_map;
2284   gchar *name;
2285   guint pt, rtx_pt;
2286   gchar *pt_s;
2287
2288   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2289
2290   pt = gst_rtsp_stream_get_pt (stream);
2291   pt_s = g_strdup_printf ("%u", pt);
2292   rtx_pt = stream->priv->rtx_pt;
2293
2294   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2295
2296   bin = gst_bin_new (NULL);
2297   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2298   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2299       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2300   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2301       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2302   g_free (pt_s);
2303   gst_structure_free (pt_map);
2304   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2305
2306   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2307   name = g_strdup_printf ("src_%u", sessid);
2308   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2309   g_free (name);
2310   gst_object_unref (pad);
2311
2312   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2313   name = g_strdup_printf ("sink_%u", sessid);
2314   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2315   g_free (name);
2316   gst_object_unref (pad);
2317
2318   return bin;
2319 }
2320
2321 /**
2322  * gst_rtsp_stream_set_pt_map:
2323  * @stream: a #GstRTSPStream
2324  * @pt: the pt
2325  * @caps: a #GstCaps
2326  *
2327  * Configure a pt map between @pt and @caps.
2328  */
2329 void
2330 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2331 {
2332   GstRTSPStreamPrivate *priv = stream->priv;
2333
2334   g_mutex_lock (&priv->lock);
2335   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2336   g_mutex_unlock (&priv->lock);
2337 }
2338
2339 /**
2340  * gst_rtsp_stream_set_publish_clock_mode:
2341  * @stream: a #GstRTSPStream
2342  * @mode: the clock publish mode
2343  *
2344  * Sets if and how the stream clock should be published according to RFC7273.
2345  *
2346  * Since: 1.8
2347  */
2348 void
2349 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2350     GstRTSPPublishClockMode mode)
2351 {
2352   GstRTSPStreamPrivate *priv;
2353
2354   priv = stream->priv;
2355   g_mutex_lock (&priv->lock);
2356   priv->publish_clock_mode = mode;
2357   g_mutex_unlock (&priv->lock);
2358 }
2359
2360 /**
2361  * gst_rtsp_stream_get_publish_clock_mode:
2362  * @factory: a #GstRTSPStream
2363  *
2364  * Gets if and how the stream clock should be published according to RFC7273.
2365  *
2366  * Returns: The GstRTSPPublishClockMode
2367  *
2368  * Since: 1.8
2369  */
2370 GstRTSPPublishClockMode
2371 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2372 {
2373   GstRTSPStreamPrivate *priv;
2374   GstRTSPPublishClockMode ret;
2375
2376   priv = stream->priv;
2377   g_mutex_lock (&priv->lock);
2378   ret = priv->publish_clock_mode;
2379   g_mutex_unlock (&priv->lock);
2380
2381   return ret;
2382 }
2383
2384 static GstCaps *
2385 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2386     GstRTSPStream * stream)
2387 {
2388   GstRTSPStreamPrivate *priv = stream->priv;
2389   GstCaps *caps = NULL;
2390
2391   g_mutex_lock (&priv->lock);
2392
2393   if (priv->idx == session) {
2394     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2395     if (caps) {
2396       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2397       gst_caps_ref (caps);
2398     } else {
2399       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2400     }
2401   }
2402
2403   g_mutex_unlock (&priv->lock);
2404
2405   return caps;
2406 }
2407
2408 static void
2409 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2410 {
2411   GstRTSPStreamPrivate *priv = stream->priv;
2412   gchar *name;
2413   GstPadLinkReturn ret;
2414   guint sessid;
2415
2416   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2417       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2418
2419   name = gst_pad_get_name (pad);
2420   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2421     g_free (name);
2422     return;
2423   }
2424   g_free (name);
2425
2426   if (priv->idx != sessid)
2427     return;
2428
2429   if (gst_pad_is_linked (priv->sinkpad)) {
2430     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2431         GST_DEBUG_PAD_NAME (priv->sinkpad));
2432     return;
2433   }
2434
2435   /* link the RTP pad to the session manager, it should not really fail unless
2436    * this is not really an RTP pad */
2437   ret = gst_pad_link (pad, priv->sinkpad);
2438   if (ret != GST_PAD_LINK_OK)
2439     goto link_failed;
2440   priv->recv_rtp_src = gst_object_ref (pad);
2441
2442   return;
2443
2444 /* ERRORS */
2445 link_failed:
2446   {
2447     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2448         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2449   }
2450 }
2451
2452 static void
2453 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2454     GstRTSPStream * stream)
2455 {
2456   /* TODO: What to do here other than this? */
2457   GST_DEBUG ("Stream %p: Got EOS", stream);
2458   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2459 }
2460
2461 /* must be called with lock */
2462 static gboolean
2463 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2464 {
2465   GstRTSPStreamPrivate *priv;
2466   GstPad *pad, *sinkpad = NULL;
2467   gboolean is_tcp = FALSE, is_udp = FALSE;
2468   gint i;
2469
2470   priv = stream->priv;
2471
2472   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2473   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2474       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2475
2476   if (is_udp && !create_and_configure_udpsinks (stream))
2477     goto no_udp_protocol;
2478
2479   for (i = 0; i < 2; i++) {
2480     GstPad *teepad, *queuepad;
2481     /* For the sender we create this bit of pipeline for both
2482      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2483      * we need to add a queue before appsink and udpsink to make
2484      * the pipeline not block. For the TCP case, we want to pump
2485      * client as fast as possible anyway. This pipeline is used
2486      * when both TCP and UDP are present.
2487      *
2488      * .--------.      .-----.    .---------.    .---------.
2489      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2490      * |       send->sink   src->sink      src->sink       |
2491      * '--------'      |     |    '---------'    '---------'
2492      *                 |     |    .---------.    .---------.
2493      *                 |     |    |  queue  |    | appsink |
2494      *                 |    src->sink      src->sink       |
2495      *                 '-----'    '---------'    '---------'
2496      *
2497      * When only UDP or only TCP is allowed, we skip the tee and queue
2498      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2499      * the session.
2500      */
2501     /* Only link the RTP send src if we're going to send RTP, link
2502      * the RTCP send src always */
2503     if (priv->srcpad || i == 1) {
2504       if (is_udp) {
2505         /* add udpsink */
2506         gst_bin_add (bin, priv->udpsink[i]);
2507         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2508       }
2509
2510       if (is_tcp) {
2511         /* make appsink */
2512         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2513         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2514         gst_bin_add (bin, priv->appsink[i]);
2515         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2516             &sink_cb, stream, NULL);
2517       }
2518
2519       if (is_udp && is_tcp) {
2520         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2521
2522         /* make tee for RTP/RTCP */
2523         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2524         gst_bin_add (bin, priv->tee[i]);
2525
2526         /* and link to rtpbin send pad */
2527         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2528         gst_pad_link (priv->send_src[i], pad);
2529         gst_object_unref (pad);
2530
2531         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2532         g_object_set (priv->udpqueue[i], "max-size-buffers",
2533             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2534             NULL);
2535         gst_bin_add (bin, priv->udpqueue[i]);
2536         /* link tee to udpqueue */
2537         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2538         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2539         gst_pad_link (teepad, pad);
2540         gst_object_unref (pad);
2541         gst_object_unref (teepad);
2542
2543         /* link udpqueue to udpsink */
2544         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2545         gst_pad_link (queuepad, sinkpad);
2546         gst_object_unref (queuepad);
2547         gst_object_unref (sinkpad);
2548
2549         /* make appqueue */
2550         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2551         g_object_set (priv->appqueue[i], "max-size-buffers",
2552             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2553             NULL);
2554         gst_bin_add (bin, priv->appqueue[i]);
2555         /* and link tee to appqueue */
2556         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2557         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2558         gst_pad_link (teepad, pad);
2559         gst_object_unref (pad);
2560         gst_object_unref (teepad);
2561
2562         /* and link appqueue to appsink */
2563         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2564         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2565         gst_pad_link (queuepad, pad);
2566         gst_object_unref (pad);
2567         gst_object_unref (queuepad);
2568       } else if (is_tcp) {
2569         /* only appsink needed, link it to the session */
2570         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2571         gst_pad_link (priv->send_src[i], pad);
2572         gst_object_unref (pad);
2573
2574         /* when its only TCP, we need to set sync and preroll to FALSE
2575          * for the sink to avoid deadlock. And this is only needed for
2576          * sink used for RTCP data, not the RTP data. */
2577         if (i == 1)
2578           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2579       } else {
2580         /* else only udpsink needed, link it to the session */
2581         gst_pad_link (priv->send_src[i], sinkpad);
2582         gst_object_unref (sinkpad);
2583       }
2584     }
2585
2586     /* check if we need to set to a special state */
2587     if (state != GST_STATE_NULL) {
2588       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2589         gst_element_set_state (priv->udpsink[i], state);
2590       if (priv->appsink[i] && (priv->srcpad || i == 1))
2591         gst_element_set_state (priv->appsink[i], state);
2592       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2593         gst_element_set_state (priv->appqueue[i], state);
2594       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2595         gst_element_set_state (priv->udpqueue[i], state);
2596       if (priv->tee[i] && (priv->srcpad || i == 1))
2597         gst_element_set_state (priv->tee[i], state);
2598     }
2599   }
2600
2601   return TRUE;
2602
2603   /* ERRORS */
2604 no_udp_protocol:
2605   {
2606     return FALSE;
2607   }
2608 }
2609
2610 /* must be called with lock */
2611 static void
2612 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2613 {
2614   GstRTSPStreamPrivate *priv;
2615   GstPad *pad, *selpad;
2616   gboolean is_tcp;
2617   gint i;
2618
2619   priv = stream->priv;
2620
2621   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2622
2623   for (i = 0; i < 2; i++) {
2624     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2625      * RTCP sink always */
2626     if (priv->sinkpad || i == 1) {
2627       /* For the receiver we create this bit of pipeline for both
2628        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2629        * and it is all funneled into the rtpbin receive pad.
2630        *
2631        * .--------.     .--------.    .--------.
2632        * | udpsrc |     | funnel |    | rtpbin |
2633        * |       src->sink      src->sink      |
2634        * '--------'     |        |    '--------'
2635        * .--------.     |        |
2636        * | appsrc |     |        |
2637        * |       src->sink       |
2638        * '--------'     '--------'
2639        */
2640       /* make funnel for the RTP/RTCP receivers */
2641       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2642       gst_bin_add (bin, priv->funnel[i]);
2643
2644       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2645       gst_pad_link (pad, priv->recv_sink[i]);
2646       gst_object_unref (pad);
2647
2648       if (is_tcp) {
2649         /* make and add appsrc */
2650         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2651         priv->appsrc_base_time[i] = -1;
2652         if (priv->srcpad) {
2653           gst_element_set_state (priv->appsrc[i], GST_STATE_PLAYING);
2654           gst_element_set_locked_state (priv->appsrc[i], TRUE);
2655         }
2656         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2657             TRUE, NULL);
2658         gst_bin_add (bin, priv->appsrc[i]);
2659         /* and link to the funnel */
2660         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2661         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2662         gst_pad_link (pad, selpad);
2663         gst_object_unref (pad);
2664         gst_object_unref (selpad);
2665       }
2666     }
2667
2668     /* check if we need to set to a special state */
2669     if (state != GST_STATE_NULL) {
2670       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2671         gst_element_set_state (priv->funnel[i], state);
2672     }
2673   }
2674 }
2675
2676 /**
2677  * gst_rtsp_stream_join_bin:
2678  * @stream: a #GstRTSPStream
2679  * @bin: (transfer none): a #GstBin to join
2680  * @rtpbin: (transfer none): a rtpbin element in @bin
2681  * @state: the target state of the new elements
2682  *
2683  * Join the #GstBin @bin that contains the element @rtpbin.
2684  *
2685  * @stream will link to @rtpbin, which must be inside @bin. The elements
2686  * added to @bin will be set to the state given in @state.
2687  *
2688  * Returns: %TRUE on success.
2689  */
2690 gboolean
2691 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2692     GstElement * rtpbin, GstState state)
2693 {
2694   GstRTSPStreamPrivate *priv;
2695   guint idx;
2696   gchar *name;
2697   GstPadLinkReturn ret;
2698
2699   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2700   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2701   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2702
2703   priv = stream->priv;
2704
2705   g_mutex_lock (&priv->lock);
2706   if (priv->is_joined)
2707     goto was_joined;
2708
2709   /* create a session with the same index as the stream */
2710   idx = priv->idx;
2711
2712   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2713
2714   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2715       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2716     /* For SRTP */
2717     g_signal_connect (rtpbin, "request-rtp-encoder",
2718         (GCallback) request_rtp_encoder, stream);
2719     g_signal_connect (rtpbin, "request-rtcp-encoder",
2720         (GCallback) request_rtcp_encoder, stream);
2721     g_signal_connect (rtpbin, "request-rtp-decoder",
2722         (GCallback) request_rtp_rtcp_decoder, stream);
2723     g_signal_connect (rtpbin, "request-rtcp-decoder",
2724         (GCallback) request_rtp_rtcp_decoder, stream);
2725   }
2726
2727   if (priv->sinkpad) {
2728     g_signal_connect (rtpbin, "request-pt-map",
2729         (GCallback) request_pt_map, stream);
2730   }
2731
2732   /* get pads from the RTP session element for sending and receiving
2733    * RTP/RTCP*/
2734   if (priv->srcpad) {
2735     /* get a pad for sending RTP */
2736     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2737     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2738     g_free (name);
2739
2740     /* link the RTP pad to the session manager, it should not really fail unless
2741      * this is not really an RTP pad */
2742     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2743     if (ret != GST_PAD_LINK_OK)
2744       goto link_failed;
2745
2746     name = g_strdup_printf ("send_rtp_src_%u", idx);
2747     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2748     g_free (name);
2749   } else {
2750     /* Need to connect our sinkpad from here */
2751     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2752     /* EOS */
2753     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2754
2755     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2756     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2757     g_free (name);
2758   }
2759
2760   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2761   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2762   g_free (name);
2763   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2764   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2765   g_free (name);
2766
2767   /* get the session */
2768   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2769
2770   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2771       stream);
2772   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2773       stream);
2774   g_signal_connect (priv->session, "on-ssrc-active",
2775       (GCallback) on_ssrc_active, stream);
2776   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2777       stream);
2778   g_signal_connect (priv->session, "on-bye-timeout",
2779       (GCallback) on_bye_timeout, stream);
2780   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2781       stream);
2782
2783   /* signal for sender ssrc */
2784   g_signal_connect (priv->session, "on-new-sender-ssrc",
2785       (GCallback) on_new_sender_ssrc, stream);
2786   g_signal_connect (priv->session, "on-sender-ssrc-active",
2787       (GCallback) on_sender_ssrc_active, stream);
2788
2789   if (!create_sender_part (stream, bin, state))
2790     goto no_udp_protocol;
2791
2792   create_receiver_part (stream, bin, state);
2793
2794   if (priv->srcpad) {
2795     /* be notified of caps changes */
2796     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2797         (GCallback) caps_notify, stream);
2798   }
2799
2800   priv->joined_bin = bin;
2801   priv->is_joined = TRUE;
2802   g_mutex_unlock (&priv->lock);
2803
2804   return TRUE;
2805
2806   /* ERRORS */
2807 was_joined:
2808   {
2809     g_mutex_unlock (&priv->lock);
2810     return TRUE;
2811   }
2812 link_failed:
2813   {
2814     GST_WARNING ("failed to link stream %u", idx);
2815     gst_object_unref (priv->send_rtp_sink);
2816     priv->send_rtp_sink = NULL;
2817     g_mutex_unlock (&priv->lock);
2818     return FALSE;
2819   }
2820 no_udp_protocol:
2821   {
2822     GST_WARNING ("failed to allocate ports %u", idx);
2823     gst_object_unref (priv->send_rtp_sink);
2824     priv->send_rtp_sink = NULL;
2825     gst_object_unref (priv->send_src[0]);
2826     priv->send_src[0] = NULL;
2827     gst_object_unref (priv->send_src[1]);
2828     priv->send_src[1] = NULL;
2829     gst_object_unref (priv->recv_sink[0]);
2830     priv->recv_sink[0] = NULL;
2831     gst_object_unref (priv->recv_sink[1]);
2832     priv->recv_sink[1] = NULL;
2833     if (priv->udpsink[0])
2834       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2835     if (priv->udpsink[1])
2836       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2837
2838     g_mutex_unlock (&priv->lock);
2839     return FALSE;
2840   }
2841 }
2842
2843 /* Must be called with priv->lock. */
2844 static void
2845 remove_all_unicast_udpsrcs (GstRTSPStream * stream, GstBin * bin)
2846 {
2847   GstRTSPStreamPrivate *priv;
2848   GHashTableIter iter;
2849   gpointer iter_key, iter_value;
2850
2851   priv = stream->priv;
2852
2853   /* Remove all of the unicast udpsrcs */
2854   g_hash_table_iter_init (&iter, priv->udpsrcs);
2855   while (g_hash_table_iter_next (&iter, &iter_key, &iter_value)) {
2856     GstRTSPStreamUDPSrcs *transport_udpsrcs =
2857         (GstRTSPStreamUDPSrcs *) iter_value;
2858
2859     for (int i = 0; i < 2; i++) {
2860       if (transport_udpsrcs->udpsrc[i]) {
2861         if (priv->sinkpad || i == 1) {
2862           /* Set udpsrc to NULL now before removing */
2863           gst_element_set_locked_state (transport_udpsrcs->udpsrc[i], FALSE);
2864           gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
2865
2866           /* removing them should also nicely release the request
2867            * pads when they finalize */
2868           gst_bin_remove (bin, transport_udpsrcs->udpsrc[i]);
2869         } else {
2870           /* we need to set the state to NULL before unref */
2871           gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
2872           gst_object_unref (transport_udpsrcs->udpsrc[i]);
2873         }
2874       }
2875     }
2876   }
2877
2878   g_hash_table_remove_all (priv->udpsrcs);
2879 }
2880
2881 /**
2882  * gst_rtsp_stream_leave_bin:
2883  * @stream: a #GstRTSPStream
2884  * @bin: (transfer none): a #GstBin
2885  * @rtpbin: (transfer none): a rtpbin #GstElement
2886  *
2887  * Remove the elements of @stream from @bin.
2888  *
2889  * Return: %TRUE on success.
2890  */
2891 gboolean
2892 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2893     GstElement * rtpbin)
2894 {
2895   GstRTSPStreamPrivate *priv;
2896   gint i;
2897   gboolean is_tcp, is_udp;
2898
2899   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2900   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2901   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2902
2903   priv = stream->priv;
2904
2905   g_mutex_lock (&priv->lock);
2906   if (!priv->is_joined)
2907     goto was_not_joined;
2908
2909   priv->joined_bin = NULL;
2910
2911   /* all transports must be removed by now */
2912   if (priv->transports != NULL)
2913     goto transports_not_removed;
2914
2915   clear_tr_cache (priv, TRUE);
2916   clear_tr_cache (priv, FALSE);
2917
2918   GST_INFO ("stream %p leaving bin", stream);
2919
2920   if (priv->srcpad) {
2921     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2922
2923     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2924     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2925     gst_object_unref (priv->send_rtp_sink);
2926     priv->send_rtp_sink = NULL;
2927   } else if (priv->recv_rtp_src) {
2928     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2929     gst_object_unref (priv->recv_rtp_src);
2930     priv->recv_rtp_src = NULL;
2931   }
2932
2933   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2934
2935   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2936       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2937
2938   remove_all_unicast_udpsrcs (stream, bin);
2939
2940   for (i = 0; i < 2; i++) {
2941     if (priv->udpsink[i])
2942       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2943     if (priv->appsink[i])
2944       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2945     if (priv->appqueue[i])
2946       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2947     if (priv->udpqueue[i])
2948       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2949     if (priv->tee[i])
2950       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2951     if (priv->funnel[i])
2952       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2953     if (priv->appsrc[i])
2954       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2955
2956     if (priv->udpsrc_mcast_v4[i]) {
2957       if (priv->sinkpad || i == 1) {
2958         /* and set udpsrc to NULL now before removing */
2959         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2960         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2961         /* removing them should also nicely release the request
2962          * pads when they finalize */
2963         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2964       } else {
2965         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2966         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2967       }
2968     }
2969
2970     if (priv->udpsrc_mcast_v6[i]) {
2971       if (priv->sinkpad || i == 1) {
2972         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
2973         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2974         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
2975       } else {
2976         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2977         gst_object_unref (priv->udpsrc_mcast_v6[i]);
2978       }
2979     }
2980
2981     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2982       gst_bin_remove (bin, priv->udpsink[i]);
2983     if (priv->appsrc[i]) {
2984       if (priv->sinkpad || i == 1) {
2985         gst_element_set_locked_state (priv->appsrc[i], FALSE);
2986         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2987         gst_bin_remove (bin, priv->appsrc[i]);
2988       } else {
2989         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2990         gst_object_unref (priv->appsrc[i]);
2991       }
2992     }
2993     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2994       gst_bin_remove (bin, priv->appsink[i]);
2995     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2996       gst_bin_remove (bin, priv->appqueue[i]);
2997     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2998       gst_bin_remove (bin, priv->udpqueue[i]);
2999     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
3000       gst_bin_remove (bin, priv->tee[i]);
3001     if (priv->funnel[i] && (priv->sinkpad || i == 1))
3002       gst_bin_remove (bin, priv->funnel[i]);
3003
3004     if (priv->sinkpad || i == 1) {
3005       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3006       gst_object_unref (priv->recv_sink[i]);
3007       priv->recv_sink[i] = NULL;
3008     }
3009
3010     priv->udpsrc_mcast_v4[i] = NULL;
3011     priv->udpsrc_mcast_v6[i] = NULL;
3012     priv->udpsink[i] = NULL;
3013     priv->appsrc[i] = NULL;
3014     priv->appsink[i] = NULL;
3015     priv->appqueue[i] = NULL;
3016     priv->udpqueue[i] = NULL;
3017     priv->tee[i] = NULL;
3018     priv->funnel[i] = NULL;
3019   }
3020
3021   if (priv->srcpad) {
3022     gst_object_unref (priv->send_src[0]);
3023     priv->send_src[0] = NULL;
3024   }
3025
3026   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3027   gst_object_unref (priv->send_src[1]);
3028   priv->send_src[1] = NULL;
3029
3030   g_object_unref (priv->session);
3031   priv->session = NULL;
3032   if (priv->caps)
3033     gst_caps_unref (priv->caps);
3034   priv->caps = NULL;
3035
3036   if (priv->srtpenc)
3037     gst_object_unref (priv->srtpenc);
3038   if (priv->srtpdec)
3039     gst_object_unref (priv->srtpdec);
3040
3041   priv->is_joined = FALSE;
3042   g_mutex_unlock (&priv->lock);
3043
3044   return TRUE;
3045
3046 was_not_joined:
3047   {
3048     g_mutex_unlock (&priv->lock);
3049     return TRUE;
3050   }
3051 transports_not_removed:
3052   {
3053     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3054     g_mutex_unlock (&priv->lock);
3055     return FALSE;
3056   }
3057 }
3058
3059 /**
3060  * gst_rtsp_stream_get_joined_bin:
3061  * @stream: a #GstRTSPStream
3062  *
3063  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3064  *
3065  * Return: (transfer full): the joined bin or NULL.
3066  */
3067 GstBin *
3068 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3069 {
3070   GstRTSPStreamPrivate *priv;
3071   GstBin *bin = NULL;
3072
3073   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3074
3075   priv = stream->priv;
3076
3077   g_mutex_lock (&priv->lock);
3078   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3079   g_mutex_unlock (&priv->lock);
3080
3081   return bin;
3082 }
3083
3084 /**
3085  * gst_rtsp_stream_get_rtpinfo:
3086  * @stream: a #GstRTSPStream
3087  * @rtptime: (allow-none): result RTP timestamp
3088  * @seq: (allow-none): result RTP seqnum
3089  * @clock_rate: (allow-none): the clock rate
3090  * @running_time: (allow-none): result running-time
3091  *
3092  * Retrieve the current rtptime, seq and running-time. This is used to
3093  * construct a RTPInfo reply header.
3094  *
3095  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3096  */
3097 gboolean
3098 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3099     guint * rtptime, guint * seq, guint * clock_rate,
3100     GstClockTime * running_time)
3101 {
3102   GstRTSPStreamPrivate *priv;
3103   GstStructure *stats;
3104   GObjectClass *payobjclass;
3105
3106   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3107
3108   priv = stream->priv;
3109
3110   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3111
3112   g_mutex_lock (&priv->lock);
3113
3114   /* First try to extract the information from the last buffer on the sinks.
3115    * This will have a more accurate sequence number and timestamp, as between
3116    * the payloader and the sink there can be some queues
3117    */
3118   if (priv->udpsink[0] || priv->appsink[0]) {
3119     GstSample *last_sample;
3120
3121     if (priv->udpsink[0])
3122       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3123     else
3124       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3125
3126     if (last_sample) {
3127       GstCaps *caps;
3128       GstBuffer *buffer;
3129       GstSegment *segment;
3130       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3131
3132       caps = gst_sample_get_caps (last_sample);
3133       buffer = gst_sample_get_buffer (last_sample);
3134       segment = gst_sample_get_segment (last_sample);
3135
3136       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3137         if (seq) {
3138           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3139         }
3140
3141         if (rtptime) {
3142           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3143         }
3144
3145         gst_rtp_buffer_unmap (&rtp_buffer);
3146
3147         if (running_time) {
3148           *running_time =
3149               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3150               GST_BUFFER_TIMESTAMP (buffer));
3151         }
3152
3153         if (clock_rate) {
3154           GstStructure *s = gst_caps_get_structure (caps, 0);
3155
3156           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3157
3158           if (*clock_rate == 0 && running_time)
3159             *running_time = GST_CLOCK_TIME_NONE;
3160         }
3161         gst_sample_unref (last_sample);
3162
3163         goto done;
3164       } else {
3165         gst_sample_unref (last_sample);
3166       }
3167     }
3168   }
3169
3170   if (g_object_class_find_property (payobjclass, "stats")) {
3171     g_object_get (priv->payloader, "stats", &stats, NULL);
3172     if (stats == NULL)
3173       goto no_stats;
3174
3175     if (seq)
3176       gst_structure_get_uint (stats, "seqnum", seq);
3177
3178     if (rtptime)
3179       gst_structure_get_uint (stats, "timestamp", rtptime);
3180
3181     if (running_time)
3182       gst_structure_get_clock_time (stats, "running-time", running_time);
3183
3184     if (clock_rate) {
3185       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3186       if (*clock_rate == 0 && running_time)
3187         *running_time = GST_CLOCK_TIME_NONE;
3188     }
3189     gst_structure_free (stats);
3190   } else {
3191     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3192         !g_object_class_find_property (payobjclass, "timestamp"))
3193       goto no_stats;
3194
3195     if (seq)
3196       g_object_get (priv->payloader, "seqnum", seq, NULL);
3197
3198     if (rtptime)
3199       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3200
3201     if (running_time)
3202       *running_time = GST_CLOCK_TIME_NONE;
3203   }
3204
3205 done:
3206   g_mutex_unlock (&priv->lock);
3207
3208   return TRUE;
3209
3210   /* ERRORS */
3211 no_stats:
3212   {
3213     GST_WARNING ("Could not get payloader stats");
3214     g_mutex_unlock (&priv->lock);
3215     return FALSE;
3216   }
3217 }
3218
3219 /**
3220  * gst_rtsp_stream_get_caps:
3221  * @stream: a #GstRTSPStream
3222  *
3223  * Retrieve the current caps of @stream.
3224  *
3225  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3226  * after usage.
3227  */
3228 GstCaps *
3229 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3230 {
3231   GstRTSPStreamPrivate *priv;
3232   GstCaps *result;
3233
3234   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3235
3236   priv = stream->priv;
3237
3238   g_mutex_lock (&priv->lock);
3239   if ((result = priv->caps))
3240     gst_caps_ref (result);
3241   g_mutex_unlock (&priv->lock);
3242
3243   return result;
3244 }
3245
3246 /**
3247  * gst_rtsp_stream_recv_rtp:
3248  * @stream: a #GstRTSPStream
3249  * @buffer: (transfer full): a #GstBuffer
3250  *
3251  * Handle an RTP buffer for the stream. This method is usually called when a
3252  * message has been received from a client using the TCP transport.
3253  *
3254  * This function takes ownership of @buffer.
3255  *
3256  * Returns: a GstFlowReturn.
3257  */
3258 GstFlowReturn
3259 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3260 {
3261   GstRTSPStreamPrivate *priv;
3262   GstFlowReturn ret;
3263   GstElement *element;
3264
3265   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3266   priv = stream->priv;
3267   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3268   g_return_val_if_fail (priv->is_joined, FALSE);
3269
3270   g_mutex_lock (&priv->lock);
3271   if (priv->appsrc[0])
3272     element = gst_object_ref (priv->appsrc[0]);
3273   else
3274     element = NULL;
3275   g_mutex_unlock (&priv->lock);
3276
3277   if (element) {
3278     if (priv->appsrc_base_time[0] == -1) {
3279       /* Take current running_time. This timestamp will be put on
3280        * the first buffer of each stream because we are a live source and so we
3281        * timestamp with the running_time. When we are dealing with TCP, we also
3282        * only timestamp the first buffer (using the DISCONT flag) because a server
3283        * typically bursts data, for which we don't want to compensate by speeding
3284        * up the media. The other timestamps will be interpollated from this one
3285        * using the RTP timestamps. */
3286       GST_OBJECT_LOCK (element);
3287       if (GST_ELEMENT_CLOCK (element)) {
3288         GstClockTime now;
3289         GstClockTime base_time;
3290
3291         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3292         base_time = GST_ELEMENT_CAST (element)->base_time;
3293
3294         priv->appsrc_base_time[0] = now - base_time;
3295         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3296         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3297             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3298             GST_TIME_ARGS (base_time));
3299       }
3300       GST_OBJECT_UNLOCK (element);
3301     }
3302
3303     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3304     gst_object_unref (element);
3305   } else {
3306     ret = GST_FLOW_OK;
3307   }
3308   return ret;
3309 }
3310
3311 /**
3312  * gst_rtsp_stream_recv_rtcp:
3313  * @stream: a #GstRTSPStream
3314  * @buffer: (transfer full): a #GstBuffer
3315  *
3316  * Handle an RTCP buffer for the stream. This method is usually called when a
3317  * message has been received from a client using the TCP transport.
3318  *
3319  * This function takes ownership of @buffer.
3320  *
3321  * Returns: a GstFlowReturn.
3322  */
3323 GstFlowReturn
3324 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3325 {
3326   GstRTSPStreamPrivate *priv;
3327   GstFlowReturn ret;
3328   GstElement *element;
3329
3330   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3331   priv = stream->priv;
3332   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3333
3334   if (!priv->is_joined) {
3335     gst_buffer_unref (buffer);
3336     return GST_FLOW_NOT_LINKED;
3337   }
3338   g_mutex_lock (&priv->lock);
3339   if (priv->appsrc[1])
3340     element = gst_object_ref (priv->appsrc[1]);
3341   else
3342     element = NULL;
3343   g_mutex_unlock (&priv->lock);
3344
3345   if (element) {
3346     if (priv->appsrc_base_time[1] == -1) {
3347       /* Take current running_time. This timestamp will be put on
3348        * the first buffer of each stream because we are a live source and so we
3349        * timestamp with the running_time. When we are dealing with TCP, we also
3350        * only timestamp the first buffer (using the DISCONT flag) because a server
3351        * typically bursts data, for which we don't want to compensate by speeding
3352        * up the media. The other timestamps will be interpollated from this one
3353        * using the RTP timestamps. */
3354       GST_OBJECT_LOCK (element);
3355       if (GST_ELEMENT_CLOCK (element)) {
3356         GstClockTime now;
3357         GstClockTime base_time;
3358
3359         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3360         base_time = GST_ELEMENT_CAST (element)->base_time;
3361
3362         priv->appsrc_base_time[1] = now - base_time;
3363         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3364         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3365             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3366             GST_TIME_ARGS (base_time));
3367       }
3368       GST_OBJECT_UNLOCK (element);
3369     }
3370
3371     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3372     gst_object_unref (element);
3373   } else {
3374     ret = GST_FLOW_OK;
3375     gst_buffer_unref (buffer);
3376   }
3377   return ret;
3378 }
3379
3380 /* Properly dispose udpsrcs that were created for a given transport. */
3381 /* Must be called with priv->lock. */
3382 static void
3383 remove_transport_udpsrcs (GstRTSPStreamPrivate * priv,
3384     const GstRTSPTransport * tr)
3385 {
3386   /* Remove the udpsrcs associated with this transport. */
3387   GstRTSPStreamUDPSrcs *transport_udpsrcs =
3388       g_hash_table_lookup (priv->udpsrcs, tr);
3389   if (transport_udpsrcs != NULL) {
3390     for (int i = 0; i < 2; i++) {
3391       if (transport_udpsrcs->udpsrc[i]) {
3392         if (priv->sinkpad || i == 1) {
3393           GstBin *bin;
3394           GstPad *udpsrc_srcpad, *funnel_sinkpad;
3395
3396           /* We know these udpsrcs are all linked to funnels. Explicitely 
3397            * get the funnel src pads so we can properly release them. */
3398           udpsrc_srcpad =
3399               gst_element_get_static_pad (transport_udpsrcs->udpsrc[i], "src");
3400           funnel_sinkpad = gst_pad_get_peer (udpsrc_srcpad);
3401
3402           if (funnel_sinkpad != NULL) {
3403             /* Unlink pads and release funnel's request pad. */
3404             gst_pad_unlink (udpsrc_srcpad, funnel_sinkpad);
3405             gst_element_release_request_pad (priv->funnel[i], funnel_sinkpad);
3406             gst_object_unref (funnel_sinkpad);
3407           }
3408           gst_object_unref (udpsrc_srcpad);
3409
3410           /* Set udpsrc to NULL now before removing */
3411           gst_element_set_locked_state (transport_udpsrcs->udpsrc[i], FALSE);
3412           gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
3413
3414           /* This udpsrc is expected to be owned by a bin. Get the bin and 
3415            * remove our element. */
3416           bin = GST_BIN (gst_element_get_parent (transport_udpsrcs->udpsrc[i]));
3417           if (bin != NULL) {
3418             gst_bin_remove (bin, transport_udpsrcs->udpsrc[i]);
3419             gst_object_unref (bin);
3420           } else {
3421             GST_ERROR ("Expected this udpsrc element to be part of a bin.");
3422             gst_object_unref (transport_udpsrcs->udpsrc[i]);
3423           }
3424
3425         } else {
3426           /* we need to set the state to NULL before unref */
3427           gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
3428           gst_object_unref (transport_udpsrcs->udpsrc[i]);
3429         }
3430       }
3431     }
3432
3433     /* The udpsrcs are now properly cleaned up. Remove them from the table */
3434     g_hash_table_remove (priv->udpsrcs, tr);
3435
3436   } else {
3437     /* This can happen if we're dealing with a multicast transport. */
3438     GST_INFO ("Could not find udpsrcs associated with this transport.");
3439   }
3440 }
3441
3442 /* must be called with lock */
3443 static gboolean
3444 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3445     gboolean add)
3446 {
3447   GstRTSPStreamPrivate *priv = stream->priv;
3448   const GstRTSPTransport *tr;
3449
3450   tr = gst_rtsp_stream_transport_get_transport (trans);
3451
3452   switch (tr->lower_transport) {
3453     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3454     case GST_RTSP_LOWER_TRANS_UDP:
3455     {
3456       gchar *dest;
3457       gint min, max;
3458       guint ttl = 0;
3459
3460       dest = tr->destination;
3461       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3462         min = tr->port.min;
3463         max = tr->port.max;
3464         ttl = tr->ttl;
3465       } else if (priv->client_side) {
3466         /* In client side mode the 'destination' is the RTSP server, so send
3467          * to those ports */
3468         min = tr->server_port.min;
3469         max = tr->server_port.max;
3470       } else {
3471         min = tr->client_port.min;
3472         max = tr->client_port.max;
3473       }
3474
3475       if (add) {
3476         if (ttl > 0) {
3477           GST_INFO ("setting ttl-mc %d", ttl);
3478           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3479           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3480         }
3481         GST_INFO ("adding %s:%d-%d", dest, min, max);
3482         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3483         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3484         priv->transports = g_list_prepend (priv->transports, trans);
3485       } else {
3486         GST_INFO ("removing %s:%d-%d", dest, min, max);
3487         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3488         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3489         priv->transports = g_list_remove (priv->transports, trans);
3490
3491         remove_transport_udpsrcs (priv, tr);
3492       }
3493       priv->transports_cookie++;
3494       break;
3495     }
3496     case GST_RTSP_LOWER_TRANS_TCP:
3497       if (add) {
3498         GST_INFO ("adding TCP %s", tr->destination);
3499         priv->transports = g_list_prepend (priv->transports, trans);
3500       } else {
3501         GST_INFO ("removing TCP %s", tr->destination);
3502         priv->transports = g_list_remove (priv->transports, trans);
3503       }
3504       priv->transports_cookie++;
3505       break;
3506     default:
3507       goto unknown_transport;
3508   }
3509   return TRUE;
3510
3511   /* ERRORS */
3512 unknown_transport:
3513   {
3514     GST_INFO ("Unknown transport %d", tr->lower_transport);
3515     return FALSE;
3516   }
3517 }
3518
3519
3520 /**
3521  * gst_rtsp_stream_add_transport:
3522  * @stream: a #GstRTSPStream
3523  * @trans: (transfer none): a #GstRTSPStreamTransport
3524  *
3525  * Add the transport in @trans to @stream. The media of @stream will
3526  * then also be send to the values configured in @trans.
3527  *
3528  * @stream must be joined to a bin.
3529  *
3530  * @trans must contain a valid #GstRTSPTransport.
3531  *
3532  * Returns: %TRUE if @trans was added
3533  */
3534 gboolean
3535 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3536     GstRTSPStreamTransport * trans)
3537 {
3538   GstRTSPStreamPrivate *priv;
3539   gboolean res;
3540
3541   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3542   priv = stream->priv;
3543   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3544   g_return_val_if_fail (priv->is_joined, FALSE);
3545
3546   g_mutex_lock (&priv->lock);
3547   res = update_transport (stream, trans, TRUE);
3548   g_mutex_unlock (&priv->lock);
3549
3550   return res;
3551 }
3552
3553 /**
3554  * gst_rtsp_stream_remove_transport:
3555  * @stream: a #GstRTSPStream
3556  * @trans: (transfer none): a #GstRTSPStreamTransport
3557  *
3558  * Remove the transport in @trans from @stream. The media of @stream will
3559  * not be sent to the values configured in @trans.
3560  *
3561  * @stream must be joined to a bin.
3562  *
3563  * @trans must contain a valid #GstRTSPTransport.
3564  *
3565  * Returns: %TRUE if @trans was removed
3566  */
3567 gboolean
3568 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3569     GstRTSPStreamTransport * trans)
3570 {
3571   GstRTSPStreamPrivate *priv;
3572   gboolean res;
3573
3574   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3575   priv = stream->priv;
3576   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3577   g_return_val_if_fail (priv->is_joined, FALSE);
3578
3579   g_mutex_lock (&priv->lock);
3580   res = update_transport (stream, trans, FALSE);
3581   g_mutex_unlock (&priv->lock);
3582
3583   return res;
3584 }
3585
3586 /**
3587  * gst_rtsp_stream_update_crypto:
3588  * @stream: a #GstRTSPStream
3589  * @ssrc: the SSRC
3590  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3591  *
3592  * Update the new crypto information for @ssrc in @stream. If information
3593  * for @ssrc did not exist, it will be added. If information
3594  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3595  * be removed from @stream.
3596  *
3597  * Returns: %TRUE if @crypto could be updated
3598  */
3599 gboolean
3600 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3601     guint ssrc, GstCaps * crypto)
3602 {
3603   GstRTSPStreamPrivate *priv;
3604
3605   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3606   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3607
3608   priv = stream->priv;
3609
3610   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3611
3612   g_mutex_lock (&priv->lock);
3613   if (crypto)
3614     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3615         gst_caps_ref (crypto));
3616   else
3617     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3618   g_mutex_unlock (&priv->lock);
3619
3620   return TRUE;
3621 }
3622
3623 /**
3624  * gst_rtsp_stream_get_rtp_socket:
3625  * @stream: a #GstRTSPStream
3626  * @family: the socket family
3627  *
3628  * Get the RTP socket from @stream for a @family.
3629  *
3630  * @stream must be joined to a bin.
3631  *
3632  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3633  * socket could be allocated for @family. Unref after usage
3634  */
3635 GSocket *
3636 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3637 {
3638   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3639   GSocket *socket;
3640   const gchar *name;
3641
3642   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3643   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3644       family == G_SOCKET_FAMILY_IPV6, NULL);
3645   g_return_val_if_fail (priv->udpsink[0], NULL);
3646
3647   if (family == G_SOCKET_FAMILY_IPV6)
3648     name = "socket-v6";
3649   else
3650     name = "socket";
3651
3652   g_object_get (priv->udpsink[0], name, &socket, NULL);
3653
3654   return socket;
3655 }
3656
3657 /**
3658  * gst_rtsp_stream_get_rtcp_socket:
3659  * @stream: a #GstRTSPStream
3660  * @family: the socket family
3661  *
3662  * Get the RTCP socket from @stream for a @family.
3663  *
3664  * @stream must be joined to a bin.
3665  *
3666  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3667  * socket could be allocated for @family. Unref after usage
3668  */
3669 GSocket *
3670 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3671 {
3672   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3673   GSocket *socket;
3674   const gchar *name;
3675
3676   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3677   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3678       family == G_SOCKET_FAMILY_IPV6, NULL);
3679   g_return_val_if_fail (priv->udpsink[1], NULL);
3680
3681   if (family == G_SOCKET_FAMILY_IPV6)
3682     name = "socket-v6";
3683   else
3684     name = "socket";
3685
3686   g_object_get (priv->udpsink[1], name, &socket, NULL);
3687
3688   return socket;
3689 }
3690
3691 /**
3692  * gst_rtsp_stream_set_seqnum:
3693  * @stream: a #GstRTSPStream
3694  * @seqnum: a new sequence number
3695  *
3696  * Configure the sequence number in the payloader of @stream to @seqnum.
3697  */
3698 void
3699 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3700 {
3701   GstRTSPStreamPrivate *priv;
3702
3703   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3704
3705   priv = stream->priv;
3706
3707   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3708 }
3709
3710 /**
3711  * gst_rtsp_stream_get_seqnum:
3712  * @stream: a #GstRTSPStream
3713  *
3714  * Get the configured sequence number in the payloader of @stream.
3715  *
3716  * Returns: the sequence number of the payloader.
3717  */
3718 guint16
3719 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3720 {
3721   GstRTSPStreamPrivate *priv;
3722   guint seqnum;
3723
3724   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3725
3726   priv = stream->priv;
3727
3728   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3729
3730   return seqnum;
3731 }
3732
3733 /**
3734  * gst_rtsp_stream_transport_filter:
3735  * @stream: a #GstRTSPStream
3736  * @func: (scope call) (allow-none): a callback
3737  * @user_data: (closure): user data passed to @func
3738  *
3739  * Call @func for each transport managed by @stream. The result value of @func
3740  * determines what happens to the transport. @func will be called with @stream
3741  * locked so no further actions on @stream can be performed from @func.
3742  *
3743  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3744  * @stream.
3745  *
3746  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3747  *
3748  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3749  * will also be added with an additional ref to the result #GList of this
3750  * function..
3751  *
3752  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3753  *
3754  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3755  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3756  * element in the #GList should be unreffed before the list is freed.
3757  */
3758 GList *
3759 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3760     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3761 {
3762   GstRTSPStreamPrivate *priv;
3763   GList *result, *walk, *next;
3764   GHashTable *visited = NULL;
3765   guint cookie;
3766
3767   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3768
3769   priv = stream->priv;
3770
3771   result = NULL;
3772   if (func)
3773     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3774
3775   g_mutex_lock (&priv->lock);
3776 restart:
3777   cookie = priv->transports_cookie;
3778   for (walk = priv->transports; walk; walk = next) {
3779     GstRTSPStreamTransport *trans = walk->data;
3780     GstRTSPFilterResult res;
3781     gboolean changed;
3782
3783     next = g_list_next (walk);
3784
3785     if (func) {
3786       /* only visit each transport once */
3787       if (g_hash_table_contains (visited, trans))
3788         continue;
3789
3790       g_hash_table_add (visited, g_object_ref (trans));
3791       g_mutex_unlock (&priv->lock);
3792
3793       res = func (stream, trans, user_data);
3794
3795       g_mutex_lock (&priv->lock);
3796     } else
3797       res = GST_RTSP_FILTER_REF;
3798
3799     changed = (cookie != priv->transports_cookie);
3800
3801     switch (res) {
3802       case GST_RTSP_FILTER_REMOVE:
3803         update_transport (stream, trans, FALSE);
3804         break;
3805       case GST_RTSP_FILTER_REF:
3806         result = g_list_prepend (result, g_object_ref (trans));
3807         break;
3808       case GST_RTSP_FILTER_KEEP:
3809       default:
3810         break;
3811     }
3812     if (changed)
3813       goto restart;
3814   }
3815   g_mutex_unlock (&priv->lock);
3816
3817   if (func)
3818     g_hash_table_unref (visited);
3819
3820   return result;
3821 }
3822
3823 static GstPadProbeReturn
3824 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3825 {
3826   GstRTSPStreamPrivate *priv;
3827   GstRTSPStream *stream;
3828
3829   stream = user_data;
3830   priv = stream->priv;
3831
3832   GST_DEBUG_OBJECT (pad, "now blocking");
3833
3834   g_mutex_lock (&priv->lock);
3835   priv->blocking = TRUE;
3836   g_mutex_unlock (&priv->lock);
3837
3838   gst_element_post_message (priv->payloader,
3839       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3840           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3841
3842   return GST_PAD_PROBE_OK;
3843 }
3844
3845 /**
3846  * gst_rtsp_stream_set_blocked:
3847  * @stream: a #GstRTSPStream
3848  * @blocked: boolean indicating we should block or unblock
3849  *
3850  * Blocks or unblocks the dataflow on @stream.
3851  *
3852  * Returns: %TRUE on success
3853  */
3854 gboolean
3855 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3856 {
3857   GstRTSPStreamPrivate *priv;
3858
3859   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3860
3861   priv = stream->priv;
3862
3863   g_mutex_lock (&priv->lock);
3864   if (blocked) {
3865     priv->blocking = FALSE;
3866     if (priv->blocked_id == 0) {
3867       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3868           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3869           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3870           g_object_ref (stream), g_object_unref);
3871     }
3872   } else {
3873     if (priv->blocked_id != 0) {
3874       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3875       priv->blocked_id = 0;
3876       priv->blocking = FALSE;
3877     }
3878   }
3879   g_mutex_unlock (&priv->lock);
3880
3881   return TRUE;
3882 }
3883
3884 /**
3885  * gst_rtsp_stream_is_blocking:
3886  * @stream: a #GstRTSPStream
3887  *
3888  * Check if @stream is blocking on a #GstBuffer.
3889  *
3890  * Returns: %TRUE if @stream is blocking
3891  */
3892 gboolean
3893 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3894 {
3895   GstRTSPStreamPrivate *priv;
3896   gboolean result;
3897
3898   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3899
3900   priv = stream->priv;
3901
3902   g_mutex_lock (&priv->lock);
3903   result = priv->blocking;
3904   g_mutex_unlock (&priv->lock);
3905
3906   return result;
3907 }
3908
3909 /**
3910  * gst_rtsp_stream_query_position:
3911  * @stream: a #GstRTSPStream
3912  *
3913  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3914  * the RTP parts of the pipeline and not the RTCP parts.
3915  *
3916  * Returns: %TRUE if the position could be queried
3917  */
3918 gboolean
3919 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3920 {
3921   GstRTSPStreamPrivate *priv;
3922   GstElement *sink;
3923   gboolean ret;
3924
3925   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3926
3927   priv = stream->priv;
3928
3929   g_mutex_lock (&priv->lock);
3930   /* depending on the transport type, it should query corresponding sink */
3931   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3932       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3933     sink = priv->udpsink[0];
3934   else
3935     sink = priv->appsink[0];
3936
3937   if (sink)
3938     gst_object_ref (sink);
3939   g_mutex_unlock (&priv->lock);
3940
3941   if (!sink)
3942     return FALSE;
3943
3944   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3945   gst_object_unref (sink);
3946
3947   return ret;
3948 }
3949
3950 /**
3951  * gst_rtsp_stream_query_stop:
3952  * @stream: a #GstRTSPStream
3953  *
3954  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3955  * the RTP parts of the pipeline and not the RTCP parts.
3956  *
3957  * Returns: %TRUE if the stop could be queried
3958  */
3959 gboolean
3960 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3961 {
3962   GstRTSPStreamPrivate *priv;
3963   GstElement *sink;
3964   GstQuery *query;
3965   gboolean ret;
3966
3967   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3968
3969   priv = stream->priv;
3970
3971   g_mutex_lock (&priv->lock);
3972   /* depending on the transport type, it should query corresponding sink */
3973   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3974       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3975     sink = priv->udpsink[0];
3976   else
3977     sink = priv->appsink[0];
3978
3979   if (sink)
3980     gst_object_ref (sink);
3981   g_mutex_unlock (&priv->lock);
3982
3983   if (!sink)
3984     return FALSE;
3985
3986   query = gst_query_new_segment (GST_FORMAT_TIME);
3987   if ((ret = gst_element_query (sink, query))) {
3988     GstFormat format;
3989
3990     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3991     if (format != GST_FORMAT_TIME)
3992       *stop = -1;
3993   }
3994   gst_query_unref (query);
3995   gst_object_unref (sink);
3996
3997   return ret;
3998
3999 }