rtsp-stream: Fix crash on cleanup with shared media and multiple udpsrc
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 /* Container for udpsrc elements created for a specific RTSPTransport. */
66 typedef struct
67 {
68   GstElement *udpsrc[2];
69 } GstRTSPStreamUDPSrcs;
70
71 static void
72 destroy_udp_srcs_func (gpointer data)
73 {
74   g_slice_free (GstRTSPStreamUDPSrcs, (GstRTSPStreamUDPSrcs *) data);
75 }
76
77 struct _GstRTSPStreamPrivate
78 {
79   GMutex lock;
80   guint idx;
81   /* Only one pad is ever set */
82   GstPad *srcpad, *sinkpad;
83   GstElement *payloader;
84   guint buffer_size;
85   gboolean is_joined;
86   GstBin *joined_bin;
87
88   /* TRUE if this stream is running on
89    * the client side of an RTSP link (for RECORD) */
90   gboolean client_side;
91   gchar *control;
92
93   GstRTSPProfile profiles;
94   GstRTSPLowerTrans protocols;
95
96   /* pads on the rtpbin */
97   GstPad *send_rtp_sink;
98   GstPad *recv_rtp_src;
99   GstPad *recv_sink[2];
100   GstPad *send_src[2];
101
102   /* the RTPSession object */
103   GObject *session;
104
105   /* SRTP encoder/decoder */
106   GstElement *srtpenc;
107   GstElement *srtpdec;
108   GHashTable *keys;
109
110   /* Unicast UDP sources associated with RTSPTransports */
111   GHashTable *udpsrcs;
112
113   /* Only allow one set of IPV4 and IPV6 multicast udpsrcs */
114   GstElement *udpsrc_mcast_v4[2];
115   GstElement *udpsrc_mcast_v6[2];
116
117   GstElement *udpqueue[2];
118   GstElement *udpsink[2];
119
120   /* for TCP transport */
121   GstElement *appsrc[2];
122   GstClockTime appsrc_base_time[2];
123   GstElement *appqueue[2];
124   GstElement *appsink[2];
125
126   GstElement *tee[2];
127   GstElement *funnel[2];
128
129   /* retransmission */
130   GstElement *rtxsend;
131   guint rtx_pt;
132   GstClockTime rtx_time;
133
134   /* server ports for sending/receiving over ipv4 */
135   GstRTSPRange server_port_v4;
136   GstRTSPAddress *server_addr_v4;
137
138   /* server ports for sending/receiving over ipv6 */
139   GstRTSPRange server_port_v6;
140   GstRTSPAddress *server_addr_v6;
141
142   /* multicast addresses */
143   GstRTSPAddressPool *pool;
144   GstRTSPAddress *addr_v4;
145   GstRTSPAddress *addr_v6;
146   gboolean have_ipv4_mcast;
147   gboolean have_ipv6_mcast;
148
149   gchar *multicast_iface;
150
151   /* the caps of the stream */
152   gulong caps_sig;
153   GstCaps *caps;
154
155   /* transports we stream to */
156   guint n_active;
157   GList *transports;
158   guint transports_cookie;
159   GList *tr_cache_rtp;
160   GList *tr_cache_rtcp;
161   guint tr_cache_cookie_rtp;
162   guint tr_cache_cookie_rtcp;
163
164
165   gint dscp_qos;
166
167   /* stream blocking */
168   gulong blocked_id;
169   gboolean blocking;
170
171   /* pt->caps map for RECORD streams */
172   GHashTable *ptmap;
173
174   GstRTSPPublishClockMode publish_clock_mode;
175 };
176
177 #define DEFAULT_CONTROL         NULL
178 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
179 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
180                                         GST_RTSP_LOWER_TRANS_TCP
181
182 enum
183 {
184   PROP_0,
185   PROP_CONTROL,
186   PROP_PROFILES,
187   PROP_PROTOCOLS,
188   PROP_LAST
189 };
190
191 enum
192 {
193   SIGNAL_NEW_RTP_ENCODER,
194   SIGNAL_NEW_RTCP_ENCODER,
195   SIGNAL_LAST
196 };
197
198 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
199 #define GST_CAT_DEFAULT rtsp_stream_debug
200
201 static GQuark ssrc_stream_map_key;
202
203 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
204     GValue * value, GParamSpec * pspec);
205 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
206     const GValue * value, GParamSpec * pspec);
207
208 static void gst_rtsp_stream_finalize (GObject * obj);
209
210 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
211
212 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
213
214 static void
215 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
216 {
217   GObjectClass *gobject_class;
218
219   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
220
221   gobject_class = G_OBJECT_CLASS (klass);
222
223   gobject_class->get_property = gst_rtsp_stream_get_property;
224   gobject_class->set_property = gst_rtsp_stream_set_property;
225   gobject_class->finalize = gst_rtsp_stream_finalize;
226
227   g_object_class_install_property (gobject_class, PROP_CONTROL,
228       g_param_spec_string ("control", "Control",
229           "The control string for this stream", DEFAULT_CONTROL,
230           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   g_object_class_install_property (gobject_class, PROP_PROFILES,
233       g_param_spec_flags ("profiles", "Profiles",
234           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
235           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
236
237   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
238       g_param_spec_flags ("protocols", "Protocols",
239           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
240           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
241
242   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
243       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
245       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
246
247   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
248       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
249       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
250       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
251
252   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
253
254   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
255 }
256
257 static void
258 gst_rtsp_stream_init (GstRTSPStream * stream)
259 {
260   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
261
262   GST_DEBUG ("new stream %p", stream);
263
264   stream->priv = priv;
265
266   priv->dscp_qos = -1;
267   priv->control = g_strdup (DEFAULT_CONTROL);
268   priv->profiles = DEFAULT_PROFILES;
269   priv->protocols = DEFAULT_PROTOCOLS;
270   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
271
272   g_mutex_init (&priv->lock);
273
274   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
275       NULL, (GDestroyNotify) gst_caps_unref);
276   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
277       (GDestroyNotify) gst_caps_unref);
278   priv->udpsrcs = g_hash_table_new_full (g_direct_hash, g_direct_equal,
279       NULL, (GDestroyNotify) destroy_udp_srcs_func);
280 }
281
282 static void
283 gst_rtsp_stream_finalize (GObject * obj)
284 {
285   GstRTSPStream *stream;
286   GstRTSPStreamPrivate *priv;
287
288   stream = GST_RTSP_STREAM (obj);
289   priv = stream->priv;
290
291   GST_DEBUG ("finalize stream %p", stream);
292
293   /* we really need to be unjoined now */
294   g_return_if_fail (!priv->is_joined);
295
296   if (priv->addr_v4)
297     gst_rtsp_address_free (priv->addr_v4);
298   if (priv->addr_v6)
299     gst_rtsp_address_free (priv->addr_v6);
300   if (priv->server_addr_v4)
301     gst_rtsp_address_free (priv->server_addr_v4);
302   if (priv->server_addr_v6)
303     gst_rtsp_address_free (priv->server_addr_v6);
304   if (priv->pool)
305     g_object_unref (priv->pool);
306   if (priv->rtxsend)
307     g_object_unref (priv->rtxsend);
308
309   g_free (priv->multicast_iface);
310
311   gst_object_unref (priv->payloader);
312   if (priv->srcpad)
313     gst_object_unref (priv->srcpad);
314   if (priv->sinkpad)
315     gst_object_unref (priv->sinkpad);
316   g_free (priv->control);
317   g_mutex_clear (&priv->lock);
318
319   g_hash_table_unref (priv->keys);
320   g_hash_table_destroy (priv->ptmap);
321
322   /* We expect all udpsrcs to be cleaned up by this point. */
323   if (g_hash_table_size (priv->udpsrcs) > 0)
324     g_critical ("Unreffing udpsrcs hash table that contains elements.");
325   g_hash_table_unref (priv->udpsrcs);
326
327   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
328 }
329
330 static void
331 gst_rtsp_stream_get_property (GObject * object, guint propid,
332     GValue * value, GParamSpec * pspec)
333 {
334   GstRTSPStream *stream = GST_RTSP_STREAM (object);
335
336   switch (propid) {
337     case PROP_CONTROL:
338       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
339       break;
340     case PROP_PROFILES:
341       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
342       break;
343     case PROP_PROTOCOLS:
344       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
345       break;
346     default:
347       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
348   }
349 }
350
351 static void
352 gst_rtsp_stream_set_property (GObject * object, guint propid,
353     const GValue * value, GParamSpec * pspec)
354 {
355   GstRTSPStream *stream = GST_RTSP_STREAM (object);
356
357   switch (propid) {
358     case PROP_CONTROL:
359       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
360       break;
361     case PROP_PROFILES:
362       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
363       break;
364     case PROP_PROTOCOLS:
365       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
366       break;
367     default:
368       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
369   }
370 }
371
372 /**
373  * gst_rtsp_stream_new:
374  * @idx: an index
375  * @pad: a #GstPad
376  * @payloader: a #GstElement
377  *
378  * Create a new media stream with index @idx that handles RTP data on
379  * @pad and has a payloader element @payloader if @pad is a source pad
380  * or a depayloader element @payloader if @pad is a sink pad.
381  *
382  * Returns: (transfer full): a new #GstRTSPStream
383  */
384 GstRTSPStream *
385 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
386 {
387   GstRTSPStreamPrivate *priv;
388   GstRTSPStream *stream;
389
390   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
391   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
392
393   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
394   priv = stream->priv;
395   priv->idx = idx;
396   priv->payloader = gst_object_ref (payloader);
397   if (GST_PAD_IS_SRC (pad))
398     priv->srcpad = gst_object_ref (pad);
399   else
400     priv->sinkpad = gst_object_ref (pad);
401
402   return stream;
403 }
404
405 /**
406  * gst_rtsp_stream_get_index:
407  * @stream: a #GstRTSPStream
408  *
409  * Get the stream index.
410  *
411  * Return: the stream index.
412  */
413 guint
414 gst_rtsp_stream_get_index (GstRTSPStream * stream)
415 {
416   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
417
418   return stream->priv->idx;
419 }
420
421 /**
422  * gst_rtsp_stream_get_pt:
423  * @stream: a #GstRTSPStream
424  *
425  * Get the stream payload type.
426  *
427  * Return: the stream payload type.
428  */
429 guint
430 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
431 {
432   GstRTSPStreamPrivate *priv;
433   guint pt;
434
435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
436
437   priv = stream->priv;
438
439   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
440
441   return pt;
442 }
443
444 /**
445  * gst_rtsp_stream_get_srcpad:
446  * @stream: a #GstRTSPStream
447  *
448  * Get the srcpad associated with @stream.
449  *
450  * Returns: (transfer full): the srcpad. Unref after usage.
451  */
452 GstPad *
453 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
454 {
455   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
456
457   if (!stream->priv->srcpad)
458     return NULL;
459
460   return gst_object_ref (stream->priv->srcpad);
461 }
462
463 /**
464  * gst_rtsp_stream_get_sinkpad:
465  * @stream: a #GstRTSPStream
466  *
467  * Get the sinkpad associated with @stream.
468  *
469  * Returns: (transfer full): the sinkpad. Unref after usage.
470  */
471 GstPad *
472 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
473 {
474   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
475
476   if (!stream->priv->sinkpad)
477     return NULL;
478
479   return gst_object_ref (stream->priv->sinkpad);
480 }
481
482 /**
483  * gst_rtsp_stream_get_control:
484  * @stream: a #GstRTSPStream
485  *
486  * Get the control string to identify this stream.
487  *
488  * Returns: (transfer full): the control string. g_free() after usage.
489  */
490 gchar *
491 gst_rtsp_stream_get_control (GstRTSPStream * stream)
492 {
493   GstRTSPStreamPrivate *priv;
494   gchar *result;
495
496   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
497
498   priv = stream->priv;
499
500   g_mutex_lock (&priv->lock);
501   if ((result = g_strdup (priv->control)) == NULL)
502     result = g_strdup_printf ("stream=%u", priv->idx);
503   g_mutex_unlock (&priv->lock);
504
505   return result;
506 }
507
508 /**
509  * gst_rtsp_stream_set_control:
510  * @stream: a #GstRTSPStream
511  * @control: a control string
512  *
513  * Set the control string in @stream.
514  */
515 void
516 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
517 {
518   GstRTSPStreamPrivate *priv;
519
520   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
521
522   priv = stream->priv;
523
524   g_mutex_lock (&priv->lock);
525   g_free (priv->control);
526   priv->control = g_strdup (control);
527   g_mutex_unlock (&priv->lock);
528 }
529
530 /**
531  * gst_rtsp_stream_has_control:
532  * @stream: a #GstRTSPStream
533  * @control: a control string
534  *
535  * Check if @stream has the control string @control.
536  *
537  * Returns: %TRUE is @stream has @control as the control string
538  */
539 gboolean
540 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
541 {
542   GstRTSPStreamPrivate *priv;
543   gboolean res;
544
545   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
546
547   priv = stream->priv;
548
549   g_mutex_lock (&priv->lock);
550   if (priv->control)
551     res = (g_strcmp0 (priv->control, control) == 0);
552   else {
553     guint streamid;
554
555     if (sscanf (control, "stream=%u", &streamid) > 0)
556       res = (streamid == priv->idx);
557     else
558       res = FALSE;
559   }
560   g_mutex_unlock (&priv->lock);
561
562   return res;
563 }
564
565 /**
566  * gst_rtsp_stream_set_mtu:
567  * @stream: a #GstRTSPStream
568  * @mtu: a new MTU
569  *
570  * Configure the mtu in the payloader of @stream to @mtu.
571  */
572 void
573 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
574 {
575   GstRTSPStreamPrivate *priv;
576
577   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
578
579   priv = stream->priv;
580
581   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
582
583   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
584 }
585
586 /**
587  * gst_rtsp_stream_get_mtu:
588  * @stream: a #GstRTSPStream
589  *
590  * Get the configured MTU in the payloader of @stream.
591  *
592  * Returns: the MTU of the payloader.
593  */
594 guint
595 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
596 {
597   GstRTSPStreamPrivate *priv;
598   guint mtu;
599
600   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
601
602   priv = stream->priv;
603
604   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
605
606   return mtu;
607 }
608
609 /* Update the dscp qos property on the udp sinks */
610 static void
611 update_dscp_qos (GstRTSPStream * stream)
612 {
613   GstRTSPStreamPrivate *priv;
614
615   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
616
617   priv = stream->priv;
618
619   if (priv->udpsink[0]) {
620     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
621         NULL);
622   }
623
624   if (priv->udpsink[1]) {
625     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
626         NULL);
627   }
628 }
629
630 /**
631  * gst_rtsp_stream_set_dscp_qos:
632  * @stream: a #GstRTSPStream
633  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
634  *
635  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
636  */
637 void
638 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
639 {
640   GstRTSPStreamPrivate *priv;
641
642   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
643
644   priv = stream->priv;
645
646   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
647
648   if (dscp_qos < -1 || dscp_qos > 63) {
649     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
650     return;
651   }
652
653   priv->dscp_qos = dscp_qos;
654
655   update_dscp_qos (stream);
656 }
657
658 /**
659  * gst_rtsp_stream_get_dscp_qos:
660  * @stream: a #GstRTSPStream
661  *
662  * Get the configured DSCP QoS in of the outgoing sockets.
663  *
664  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
665  */
666 gint
667 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
668 {
669   GstRTSPStreamPrivate *priv;
670
671   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
672
673   priv = stream->priv;
674
675   return priv->dscp_qos;
676 }
677
678 /**
679  * gst_rtsp_stream_is_transport_supported:
680  * @stream: a #GstRTSPStream
681  * @transport: (transfer none): a #GstRTSPTransport
682  *
683  * Check if @transport can be handled by stream
684  *
685  * Returns: %TRUE if @transport can be handled by @stream.
686  */
687 gboolean
688 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
689     GstRTSPTransport * transport)
690 {
691   GstRTSPStreamPrivate *priv;
692
693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
694
695   priv = stream->priv;
696
697   g_mutex_lock (&priv->lock);
698   if (transport->trans != GST_RTSP_TRANS_RTP)
699     goto unsupported_transmode;
700
701   if (!(transport->profile & priv->profiles))
702     goto unsupported_profile;
703
704   if (!(transport->lower_transport & priv->protocols))
705     goto unsupported_ltrans;
706
707   g_mutex_unlock (&priv->lock);
708
709   return TRUE;
710
711   /* ERRORS */
712 unsupported_transmode:
713   {
714     GST_DEBUG ("unsupported transport mode %d", transport->trans);
715     g_mutex_unlock (&priv->lock);
716     return FALSE;
717   }
718 unsupported_profile:
719   {
720     GST_DEBUG ("unsupported profile %d", transport->profile);
721     g_mutex_unlock (&priv->lock);
722     return FALSE;
723   }
724 unsupported_ltrans:
725   {
726     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
727     g_mutex_unlock (&priv->lock);
728     return FALSE;
729   }
730 }
731
732 /**
733  * gst_rtsp_stream_set_profiles:
734  * @stream: a #GstRTSPStream
735  * @profiles: the new profiles
736  *
737  * Configure the allowed profiles for @stream.
738  */
739 void
740 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
741 {
742   GstRTSPStreamPrivate *priv;
743
744   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
745
746   priv = stream->priv;
747
748   g_mutex_lock (&priv->lock);
749   priv->profiles = profiles;
750   g_mutex_unlock (&priv->lock);
751 }
752
753 /**
754  * gst_rtsp_stream_get_profiles:
755  * @stream: a #GstRTSPStream
756  *
757  * Get the allowed profiles of @stream.
758  *
759  * Returns: a #GstRTSPProfile
760  */
761 GstRTSPProfile
762 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
763 {
764   GstRTSPStreamPrivate *priv;
765   GstRTSPProfile res;
766
767   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
768
769   priv = stream->priv;
770
771   g_mutex_lock (&priv->lock);
772   res = priv->profiles;
773   g_mutex_unlock (&priv->lock);
774
775   return res;
776 }
777
778 /**
779  * gst_rtsp_stream_set_protocols:
780  * @stream: a #GstRTSPStream
781  * @protocols: the new flags
782  *
783  * Configure the allowed lower transport for @stream.
784  */
785 void
786 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
787     GstRTSPLowerTrans protocols)
788 {
789   GstRTSPStreamPrivate *priv;
790
791   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
792
793   priv = stream->priv;
794
795   g_mutex_lock (&priv->lock);
796   priv->protocols = protocols;
797   g_mutex_unlock (&priv->lock);
798 }
799
800 /**
801  * gst_rtsp_stream_get_protocols:
802  * @stream: a #GstRTSPStream
803  *
804  * Get the allowed protocols of @stream.
805  *
806  * Returns: a #GstRTSPLowerTrans
807  */
808 GstRTSPLowerTrans
809 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
810 {
811   GstRTSPStreamPrivate *priv;
812   GstRTSPLowerTrans res;
813
814   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
815       GST_RTSP_LOWER_TRANS_UNKNOWN);
816
817   priv = stream->priv;
818
819   g_mutex_lock (&priv->lock);
820   res = priv->protocols;
821   g_mutex_unlock (&priv->lock);
822
823   return res;
824 }
825
826 /**
827  * gst_rtsp_stream_set_address_pool:
828  * @stream: a #GstRTSPStream
829  * @pool: (transfer none): a #GstRTSPAddressPool
830  *
831  * configure @pool to be used as the address pool of @stream.
832  */
833 void
834 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
835     GstRTSPAddressPool * pool)
836 {
837   GstRTSPStreamPrivate *priv;
838   GstRTSPAddressPool *old;
839
840   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
841
842   priv = stream->priv;
843
844   GST_LOG_OBJECT (stream, "set address pool %p", pool);
845
846   g_mutex_lock (&priv->lock);
847   if ((old = priv->pool) != pool)
848     priv->pool = pool ? g_object_ref (pool) : NULL;
849   else
850     old = NULL;
851   g_mutex_unlock (&priv->lock);
852
853   if (old)
854     g_object_unref (old);
855 }
856
857 /**
858  * gst_rtsp_stream_get_address_pool:
859  * @stream: a #GstRTSPStream
860  *
861  * Get the #GstRTSPAddressPool used as the address pool of @stream.
862  *
863  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
864  * usage.
865  */
866 GstRTSPAddressPool *
867 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
868 {
869   GstRTSPStreamPrivate *priv;
870   GstRTSPAddressPool *result;
871
872   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
873
874   priv = stream->priv;
875
876   g_mutex_lock (&priv->lock);
877   if ((result = priv->pool))
878     g_object_ref (result);
879   g_mutex_unlock (&priv->lock);
880
881   return result;
882 }
883
884 /**
885  * gst_rtsp_stream_set_multicast_iface:
886  * @stream: a #GstRTSPStream
887  * @multicast_iface: (transfer none): a multicast interface
888  *
889  * configure @multicast_iface to be used for @stream.
890  */
891 void
892 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
893     const gchar * multicast_iface)
894 {
895   GstRTSPStreamPrivate *priv;
896   gchar *old;
897
898   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
899
900   priv = stream->priv;
901
902   GST_LOG_OBJECT (stream, "set multicast iface %s",
903       GST_STR_NULL (multicast_iface));
904
905   g_mutex_lock (&priv->lock);
906   if ((old = priv->multicast_iface) != multicast_iface)
907     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
908   else
909     old = NULL;
910   g_mutex_unlock (&priv->lock);
911
912   if (old)
913     g_free (old);
914 }
915
916 /**
917  * gst_rtsp_stream_get_multicast_iface:
918  * @stream: a #GstRTSPStream
919  *
920  * Get the multicast interface used for @stream.
921  *
922  * Returns: (transfer full): the multicast interface for @stream. g_free() after
923  * usage.
924  */
925 gchar *
926 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
927 {
928   GstRTSPStreamPrivate *priv;
929   gchar *result;
930
931   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
932
933   priv = stream->priv;
934
935   g_mutex_lock (&priv->lock);
936   if ((result = priv->multicast_iface))
937     result = g_strdup (result);
938   g_mutex_unlock (&priv->lock);
939
940   return result;
941 }
942
943 /**
944  * gst_rtsp_stream_get_multicast_address:
945  * @stream: a #GstRTSPStream
946  * @family: the #GSocketFamily
947  *
948  * Get the multicast address of @stream for @family.
949  *
950  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
951  * or %NULL when no address could be allocated. gst_rtsp_address_free()
952  * after usage.
953  */
954 GstRTSPAddress *
955 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
956     GSocketFamily family)
957 {
958   GstRTSPStreamPrivate *priv;
959   GstRTSPAddress *result;
960   GstRTSPAddress **addrp;
961   GstRTSPAddressFlags flags;
962
963   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
964
965   priv = stream->priv;
966
967   if (family == G_SOCKET_FAMILY_IPV6) {
968     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
969     addrp = &priv->addr_v6;
970   } else {
971     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
972     addrp = &priv->addr_v4;
973   }
974
975   g_mutex_lock (&priv->lock);
976   if (*addrp == NULL) {
977     if (priv->pool == NULL)
978       goto no_pool;
979
980     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
981
982     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
983     if (*addrp == NULL)
984       goto no_address;
985   }
986   result = gst_rtsp_address_copy (*addrp);
987   g_mutex_unlock (&priv->lock);
988
989   return result;
990
991   /* ERRORS */
992 no_pool:
993   {
994     GST_ERROR_OBJECT (stream, "no address pool specified");
995     g_mutex_unlock (&priv->lock);
996     return NULL;
997   }
998 no_address:
999   {
1000     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1001     g_mutex_unlock (&priv->lock);
1002     return NULL;
1003   }
1004 }
1005
1006 /**
1007  * gst_rtsp_stream_reserve_address:
1008  * @stream: a #GstRTSPStream
1009  * @address: an address
1010  * @port: a port
1011  * @n_ports: n_ports
1012  * @ttl: a TTL
1013  *
1014  * Reserve @address and @port as the address and port of @stream.
1015  *
1016  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1017  * the address could be reserved. gst_rtsp_address_free() after usage.
1018  */
1019 GstRTSPAddress *
1020 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1021     const gchar * address, guint port, guint n_ports, guint ttl)
1022 {
1023   GstRTSPStreamPrivate *priv;
1024   GstRTSPAddress *result;
1025   GInetAddress *addr;
1026   GSocketFamily family;
1027   GstRTSPAddress **addrp;
1028
1029   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1030   g_return_val_if_fail (address != NULL, NULL);
1031   g_return_val_if_fail (port > 0, NULL);
1032   g_return_val_if_fail (n_ports > 0, NULL);
1033   g_return_val_if_fail (ttl > 0, NULL);
1034
1035   priv = stream->priv;
1036
1037   addr = g_inet_address_new_from_string (address);
1038   if (!addr) {
1039     GST_ERROR ("failed to get inet addr from %s", address);
1040     family = G_SOCKET_FAMILY_IPV4;
1041   } else {
1042     family = g_inet_address_get_family (addr);
1043     g_object_unref (addr);
1044   }
1045
1046   if (family == G_SOCKET_FAMILY_IPV6)
1047     addrp = &priv->addr_v6;
1048   else
1049     addrp = &priv->addr_v4;
1050
1051   g_mutex_lock (&priv->lock);
1052   if (*addrp == NULL) {
1053     GstRTSPAddressPoolResult res;
1054
1055     if (priv->pool == NULL)
1056       goto no_pool;
1057
1058     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1059         port, n_ports, ttl, addrp);
1060     if (res != GST_RTSP_ADDRESS_POOL_OK)
1061       goto no_address;
1062   } else {
1063     if (strcmp ((*addrp)->address, address) ||
1064         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1065         (*addrp)->ttl != ttl)
1066       goto different_address;
1067   }
1068   result = gst_rtsp_address_copy (*addrp);
1069   g_mutex_unlock (&priv->lock);
1070
1071   return result;
1072
1073   /* ERRORS */
1074 no_pool:
1075   {
1076     GST_ERROR_OBJECT (stream, "no address pool specified");
1077     g_mutex_unlock (&priv->lock);
1078     return NULL;
1079   }
1080 no_address:
1081   {
1082     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1083         address);
1084     g_mutex_unlock (&priv->lock);
1085     return NULL;
1086   }
1087 different_address:
1088   {
1089     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1090         " reserved", address);
1091     g_mutex_unlock (&priv->lock);
1092     return NULL;
1093   }
1094 }
1095
1096 /* must be called with lock */
1097 static void
1098 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1099     GSocket * rtcp_socket, GSocketFamily family)
1100 {
1101   GstRTSPStreamPrivate *priv = stream->priv;
1102   const gchar *multisink_socket;
1103
1104   if (family == G_SOCKET_FAMILY_IPV6)
1105     multisink_socket = "socket-v6";
1106   else
1107     multisink_socket = "socket";
1108
1109   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1110       NULL);
1111   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1112       NULL);
1113 }
1114
1115 /* must be called with lock */
1116 static gboolean
1117 create_and_configure_udpsinks (GstRTSPStream * stream)
1118 {
1119   GstRTSPStreamPrivate *priv = stream->priv;
1120   GstElement *udpsink0, *udpsink1;
1121
1122   udpsink0 = NULL;
1123   udpsink1 = NULL;
1124
1125   if (priv->udpsink[0])
1126     udpsink0 = priv->udpsink[0];
1127   else
1128     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1129
1130   if (!udpsink0)
1131     goto no_udp_protocol;
1132
1133   if (priv->udpsink[1])
1134     udpsink1 = priv->udpsink[1];
1135   else
1136     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1137
1138   if (!udpsink1)
1139     goto no_udp_protocol;
1140
1141   /* configure sinks */
1142
1143   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1144   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1145
1146   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1147   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1148
1149   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1150
1151   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1152   /* Needs to be async for RECORD streams, otherwise we will never go to
1153    * PLAYING because the sinks will wait for data while the udpsrc can't
1154    * provide data with timestamps in PAUSED. */
1155   if (priv->sinkpad)
1156     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1157   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1158
1159   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1160   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1161
1162   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1163   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1164
1165   /* update the dscp qos field in the sinks */
1166   update_dscp_qos (stream);
1167
1168   priv->udpsink[0] = udpsink0;
1169   priv->udpsink[1] = udpsink1;
1170
1171   return TRUE;
1172
1173   /* ERRORS */
1174 no_udp_protocol:
1175   {
1176     return FALSE;
1177   }
1178 }
1179
1180 /* must be called with lock */
1181 static void
1182 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1183     GSocketFamily family)
1184 {
1185   GstRTSPStreamPrivate *priv;
1186   GstPad *pad, *selpad;
1187   guint i;
1188   GstBin *bin;
1189
1190   priv = stream->priv;
1191   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1192
1193   for (i = 0; i < 2; i++) {
1194     if (priv->sinkpad || i == 1) {
1195       if (priv->srcpad) {
1196         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1197          * values. This is only relevant for PLAY pipelines */
1198         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1199         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1200       }
1201       /* add udpsrc */
1202       gst_bin_add (bin, udpsrc_out[i]);
1203
1204       /* and link to the funnel */
1205       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1206       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1207       gst_pad_link (pad, selpad);
1208       gst_object_unref (pad);
1209       gst_object_unref (selpad);
1210
1211       /* otherwise sync state with parent in case it's running already
1212        * at this point */
1213       if (!priv->srcpad) {
1214         gst_element_sync_state_with_parent (udpsrc_out[i]);
1215       }
1216     }
1217   }
1218
1219   gst_object_unref (bin);
1220 }
1221
1222 /* must be called with lock */
1223 static gboolean
1224 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1225     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1226     const gchar * address, gint rtpport, gint rtcpport,
1227     const gchar * multicast_iface, GstRTSPLowerTrans transport)
1228 {
1229   GstStateChangeReturn ret;
1230
1231   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1232   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1233
1234   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1235     goto error;
1236
1237   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1238     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1239     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1240     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1241     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1242     g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface,
1243         NULL);
1244     g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface,
1245         NULL);
1246     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1247     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1248   }
1249
1250   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1251   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1252
1253   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1254   if (ret == GST_STATE_CHANGE_FAILURE)
1255     goto error;
1256   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1257   if (ret == GST_STATE_CHANGE_FAILURE)
1258     goto error;
1259
1260   return TRUE;
1261   return TRUE;
1262
1263   /* ERRORS */
1264 error:
1265   {
1266     if (udpsrc_out[0])
1267       gst_object_unref (udpsrc_out[0]);
1268     if (udpsrc_out[1])
1269       gst_object_unref (udpsrc_out[1]);
1270     return FALSE;
1271   }
1272 }
1273
1274 static gboolean
1275 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1276     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1277     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1278     gboolean use_client_settings)
1279 {
1280   GstRTSPStreamPrivate *priv = stream->priv;
1281   GSocket *rtp_socket = NULL;
1282   GSocket *rtcp_socket;
1283   gint tmp_rtp, tmp_rtcp;
1284   guint count;
1285   gint rtpport, rtcpport;
1286   GList *rejected_addresses = NULL;
1287   GstRTSPAddress *addr = NULL;
1288   GInetAddress *inetaddr = NULL;
1289   gchar *addr_str;
1290   GSocketAddress *rtp_sockaddr = NULL;
1291   GSocketAddress *rtcp_sockaddr = NULL;
1292   GstRTSPAddressPool *pool;
1293   GstRTSPLowerTrans transport;
1294   const gchar *multicast_iface = priv->multicast_iface;
1295
1296   pool = priv->pool;
1297   count = 0;
1298   transport = ct->lower_transport;
1299
1300   /* Start with random port */
1301   tmp_rtp = 0;
1302
1303   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1304       G_SOCKET_PROTOCOL_UDP, NULL);
1305   if (!rtcp_socket)
1306     goto no_udp_protocol;
1307   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1308
1309   if (*server_addr_out)
1310     gst_rtsp_address_free (*server_addr_out);
1311
1312   /* try to allocate 2 UDP ports, the RTP port should be an even
1313    * number and the RTCP port should be the next (uneven) port */
1314 again:
1315
1316   if (rtp_socket == NULL) {
1317     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1318         G_SOCKET_PROTOCOL_UDP, NULL);
1319     if (!rtp_socket)
1320       goto no_udp_protocol;
1321     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1322   }
1323
1324   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1325               gst_rtsp_address_pool_has_unicast_addresses (pool))
1326           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1327     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1328
1329     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1330       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1331     else
1332       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1333
1334     if (addr)
1335       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1336
1337     if (family == G_SOCKET_FAMILY_IPV6)
1338       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1339     else
1340       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1341
1342     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1343         && use_client_settings)
1344       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1345           ct->port.min, 2, ct->ttl, &addr);
1346     else
1347       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1348
1349     if (addr == NULL)
1350       goto no_ports;
1351
1352     tmp_rtp = addr->port;
1353
1354     g_clear_object (&inetaddr);
1355     inetaddr = g_inet_address_new_from_string (addr->address);
1356
1357     /* If we're supposed to bind to a multicast address, instead bind
1358      * to ANY and let udpsrc later join the relevant multicast group
1359      */
1360     if (g_inet_address_get_is_multicast (inetaddr)) {
1361       g_object_unref (inetaddr);
1362       inetaddr = g_inet_address_new_any (family);
1363     }
1364   } else {
1365     if (tmp_rtp != 0) {
1366       tmp_rtp += 2;
1367       if (++count > 20)
1368         goto no_ports;
1369     }
1370
1371     if (inetaddr == NULL)
1372       inetaddr = g_inet_address_new_any (family);
1373   }
1374
1375   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1376   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1377     g_object_unref (rtp_sockaddr);
1378     goto again;
1379   }
1380   g_object_unref (rtp_sockaddr);
1381
1382   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1383   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1384     g_clear_object (&rtp_sockaddr);
1385     goto socket_error;
1386   }
1387
1388   tmp_rtp =
1389       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1390   g_object_unref (rtp_sockaddr);
1391
1392   /* check if port is even */
1393   if ((tmp_rtp & 1) != 0) {
1394     /* port not even, close and allocate another */
1395     tmp_rtp++;
1396     g_clear_object (&rtp_socket);
1397     goto again;
1398   }
1399
1400   /* set port */
1401   tmp_rtcp = tmp_rtp + 1;
1402
1403   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1404   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1405     g_object_unref (rtcp_sockaddr);
1406     g_clear_object (&rtp_socket);
1407     goto again;
1408   }
1409   g_object_unref (rtcp_sockaddr);
1410
1411   if (addr == NULL)
1412     addr_str = g_inet_address_to_string (inetaddr);
1413   else
1414     addr_str = addr->address;
1415   g_clear_object (&inetaddr);
1416
1417   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1418           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface,
1419           transport)) {
1420     if (addr == NULL)
1421       g_free (addr_str);
1422     goto no_udp_protocol;
1423   }
1424
1425   if (addr == NULL)
1426     g_free (addr_str);
1427
1428   play_udpsources_one_family (stream, udpsrc_out, family);
1429
1430   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1431   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1432
1433   /* this should not happen... */
1434   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1435     goto port_error;
1436
1437   /* set RTP and RTCP sockets */
1438   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1439
1440   server_port_out->min = rtpport;
1441   server_port_out->max = rtcpport;
1442
1443   *server_addr_out = addr;
1444   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1445
1446   g_object_unref (rtp_socket);
1447   g_object_unref (rtcp_socket);
1448
1449   return TRUE;
1450
1451   /* ERRORS */
1452 no_udp_protocol:
1453   {
1454     goto cleanup;
1455   }
1456 no_ports:
1457   {
1458     goto cleanup;
1459   }
1460 port_error:
1461   {
1462     goto cleanup;
1463   }
1464 socket_error:
1465   {
1466     goto cleanup;
1467   }
1468 cleanup:
1469   {
1470     if (inetaddr)
1471       g_object_unref (inetaddr);
1472     g_list_free_full (rejected_addresses,
1473         (GDestroyNotify) gst_rtsp_address_free);
1474     if (addr)
1475       gst_rtsp_address_free (addr);
1476     if (rtp_socket)
1477       g_object_unref (rtp_socket);
1478     if (rtcp_socket)
1479       g_object_unref (rtcp_socket);
1480     return FALSE;
1481   }
1482 }
1483
1484 /**
1485  * gst_rtsp_stream_allocate_udp_sockets:
1486  * @stream: a #GstRTSPStream
1487  * @family: protocol family
1488  * @transport_method: transport method
1489  *
1490  * Allocates RTP and RTCP ports.
1491  *
1492  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1493  */
1494 gboolean
1495 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1496     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1497 {
1498   GstRTSPStreamPrivate *priv;
1499   gboolean result = FALSE;
1500   GstRTSPLowerTrans transport = ct->lower_transport;
1501
1502   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1503   priv = stream->priv;
1504   g_return_val_if_fail (priv->is_joined, FALSE);
1505
1506   g_mutex_lock (&priv->lock);
1507
1508   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1509     if (family == G_SOCKET_FAMILY_IPV4) {
1510       /* Multicast IPV4 */
1511       if (priv->have_ipv4_mcast) {
1512         result = TRUE;
1513         goto done;
1514       }
1515
1516       priv->have_ipv4_mcast =
1517           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1518           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
1519           use_client_settings);
1520       result = priv->have_ipv4_mcast;
1521
1522     } else {
1523       /* Multicast IPV6 */
1524       if (priv->have_ipv6_mcast) {
1525         result = TRUE;
1526         goto done;
1527       }
1528
1529       priv->have_ipv6_mcast =
1530           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1531           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
1532           use_client_settings);
1533       result = priv->have_ipv6_mcast;
1534     }
1535   } else {
1536     /* We allow multiple unicast transports, so we must maintain a table of the 
1537      * udpsrcs created for them. */
1538     GstRTSPStreamUDPSrcs *transport_udpsrcs =
1539         g_slice_new0 (GstRTSPStreamUDPSrcs);
1540
1541     if (family == G_SOCKET_FAMILY_IPV4) {
1542       /* Unicast IPV4 */
1543       result =
1544           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1545           transport_udpsrcs->udpsrc, &priv->server_port_v4, ct,
1546           &priv->server_addr_v4, use_client_settings);
1547     } else {
1548       /* Unicast IPV6 */
1549       result =
1550           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1551           transport_udpsrcs->udpsrc, &priv->server_port_v6, ct,
1552           &priv->server_addr_v6, use_client_settings);
1553     }
1554
1555     /* If we didn't create any unicast udpsrcs, free the transport_udpsrcs struct. 
1556      * Otherwise, add it to the hash table */
1557     if (transport_udpsrcs->udpsrc[0] == NULL
1558         && transport_udpsrcs->udpsrc[1] == NULL)
1559       g_slice_free (GstRTSPStreamUDPSrcs, transport_udpsrcs);
1560     else
1561       g_hash_table_insert (priv->udpsrcs, ct, transport_udpsrcs);
1562   }
1563
1564 done:
1565   g_mutex_unlock (&priv->lock);
1566
1567   return result;
1568 }
1569
1570 /**
1571  * gst_rtsp_stream_set_client_side:
1572  * @stream: a #GstRTSPStream
1573  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1574  * an RTSP connection.
1575  *
1576  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1577  * streams to an RTSP server via RECORD. This has the practical effect
1578  * of changing which UDP port numbers are used when setting up the local
1579  * side of the stream sending to be either the 'server' or 'client' pair
1580  * of a configured UDP transport.
1581  */
1582 void
1583 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1584 {
1585   GstRTSPStreamPrivate *priv;
1586
1587   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1588   priv = stream->priv;
1589   g_mutex_lock (&priv->lock);
1590   priv->client_side = client_side;
1591   g_mutex_unlock (&priv->lock);
1592 }
1593
1594 /**
1595  * gst_rtsp_stream_is_client_side:
1596  * @stream: a #GstRTSPStream
1597  *
1598  * See gst_rtsp_stream_set_client_side()
1599  *
1600  * Returns: TRUE if this #GstRTSPStream is client-side.
1601  */
1602 gboolean
1603 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1604 {
1605   GstRTSPStreamPrivate *priv;
1606   gboolean ret;
1607
1608   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1609
1610   priv = stream->priv;
1611   g_mutex_lock (&priv->lock);
1612   ret = priv->client_side;
1613   g_mutex_unlock (&priv->lock);
1614
1615   return ret;
1616 }
1617
1618 /**
1619  * gst_rtsp_stream_get_server_port:
1620  * @stream: a #GstRTSPStream
1621  * @server_port: (out): result server port
1622  * @family: the port family to get
1623  *
1624  * Fill @server_port with the port pair used by the server. This function can
1625  * only be called when @stream has been joined.
1626  */
1627 void
1628 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1629     GstRTSPRange * server_port, GSocketFamily family)
1630 {
1631   GstRTSPStreamPrivate *priv;
1632
1633   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1634   priv = stream->priv;
1635   g_return_if_fail (priv->is_joined);
1636
1637   g_mutex_lock (&priv->lock);
1638   if (family == G_SOCKET_FAMILY_IPV4) {
1639     if (server_port)
1640       *server_port = priv->server_port_v4;
1641   } else {
1642     if (server_port)
1643       *server_port = priv->server_port_v6;
1644   }
1645   g_mutex_unlock (&priv->lock);
1646 }
1647
1648 /**
1649  * gst_rtsp_stream_get_rtpsession:
1650  * @stream: a #GstRTSPStream
1651  *
1652  * Get the RTP session of this stream.
1653  *
1654  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1655  */
1656 GObject *
1657 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1658 {
1659   GstRTSPStreamPrivate *priv;
1660   GObject *session;
1661
1662   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1663
1664   priv = stream->priv;
1665
1666   g_mutex_lock (&priv->lock);
1667   if ((session = priv->session))
1668     g_object_ref (session);
1669   g_mutex_unlock (&priv->lock);
1670
1671   return session;
1672 }
1673
1674 /**
1675  * gst_rtsp_stream_get_ssrc:
1676  * @stream: a #GstRTSPStream
1677  * @ssrc: (out): result ssrc
1678  *
1679  * Get the SSRC used by the RTP session of this stream. This function can only
1680  * be called when @stream has been joined.
1681  */
1682 void
1683 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1684 {
1685   GstRTSPStreamPrivate *priv;
1686
1687   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1688   priv = stream->priv;
1689   g_return_if_fail (priv->is_joined);
1690
1691   g_mutex_lock (&priv->lock);
1692   if (ssrc && priv->session)
1693     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1694   g_mutex_unlock (&priv->lock);
1695 }
1696
1697 /**
1698  * gst_rtsp_stream_set_retransmission_time:
1699  * @stream: a #GstRTSPStream
1700  * @time: a #GstClockTime
1701  *
1702  * Set the amount of time to store retransmission packets.
1703  */
1704 void
1705 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1706     GstClockTime time)
1707 {
1708   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1709
1710   g_mutex_lock (&stream->priv->lock);
1711   stream->priv->rtx_time = time;
1712   if (stream->priv->rtxsend)
1713     g_object_set (stream->priv->rtxsend, "max-size-time",
1714         GST_TIME_AS_MSECONDS (time), NULL);
1715   g_mutex_unlock (&stream->priv->lock);
1716 }
1717
1718 /**
1719  * gst_rtsp_stream_get_retransmission_time:
1720  * @stream: a #GstRTSPStream
1721  *
1722  * Get the amount of time to store retransmission data.
1723  *
1724  * Returns: the amount of time to store retransmission data.
1725  */
1726 GstClockTime
1727 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1728 {
1729   GstClockTime ret;
1730
1731   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1732
1733   g_mutex_lock (&stream->priv->lock);
1734   ret = stream->priv->rtx_time;
1735   g_mutex_unlock (&stream->priv->lock);
1736
1737   return ret;
1738 }
1739
1740 /**
1741  * gst_rtsp_stream_set_retransmission_pt:
1742  * @stream: a #GstRTSPStream
1743  * @rtx_pt: a #guint
1744  *
1745  * Set the payload type (pt) for retransmission of this stream.
1746  */
1747 void
1748 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1749 {
1750   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1751
1752   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1753
1754   g_mutex_lock (&stream->priv->lock);
1755   stream->priv->rtx_pt = rtx_pt;
1756   if (stream->priv->rtxsend) {
1757     guint pt = gst_rtsp_stream_get_pt (stream);
1758     gchar *pt_s = g_strdup_printf ("%d", pt);
1759     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1760         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1761     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1762     g_free (pt_s);
1763     gst_structure_free (rtx_pt_map);
1764   }
1765   g_mutex_unlock (&stream->priv->lock);
1766 }
1767
1768 /**
1769  * gst_rtsp_stream_get_retransmission_pt:
1770  * @stream: a #GstRTSPStream
1771  *
1772  * Get the payload-type used for retransmission of this stream
1773  *
1774  * Returns: The retransmission PT.
1775  */
1776 guint
1777 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1778 {
1779   guint rtx_pt;
1780
1781   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1782
1783   g_mutex_lock (&stream->priv->lock);
1784   rtx_pt = stream->priv->rtx_pt;
1785   g_mutex_unlock (&stream->priv->lock);
1786
1787   return rtx_pt;
1788 }
1789
1790 /**
1791  * gst_rtsp_stream_set_buffer_size:
1792  * @stream: a #GstRTSPStream
1793  * @size: the buffer size
1794  *
1795  * Set the size of the UDP transmission buffer (in bytes)
1796  * Needs to be set before the stream is joined to a bin.
1797  *
1798  * Since: 1.6
1799  */
1800 void
1801 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1802 {
1803   g_mutex_lock (&stream->priv->lock);
1804   stream->priv->buffer_size = size;
1805   g_mutex_unlock (&stream->priv->lock);
1806 }
1807
1808 /**
1809  * gst_rtsp_stream_get_buffer_size:
1810  * @stream: a #GstRTSPStream
1811  *
1812  * Get the size of the UDP transmission buffer (in bytes)
1813  *
1814  * Returns: the size of the UDP TX buffer
1815  *
1816  * Since: 1.6
1817  */
1818 guint
1819 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1820 {
1821   guint buffer_size;
1822
1823   g_mutex_lock (&stream->priv->lock);
1824   buffer_size = stream->priv->buffer_size;
1825   g_mutex_unlock (&stream->priv->lock);
1826
1827   return buffer_size;
1828 }
1829
1830 /* executed from streaming thread */
1831 static void
1832 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1833 {
1834   GstRTSPStreamPrivate *priv = stream->priv;
1835   GstCaps *newcaps, *oldcaps;
1836
1837   newcaps = gst_pad_get_current_caps (pad);
1838
1839   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1840       newcaps);
1841
1842   g_mutex_lock (&priv->lock);
1843   oldcaps = priv->caps;
1844   priv->caps = newcaps;
1845   g_mutex_unlock (&priv->lock);
1846
1847   if (oldcaps)
1848     gst_caps_unref (oldcaps);
1849 }
1850
1851 static void
1852 dump_structure (const GstStructure * s)
1853 {
1854   gchar *sstr;
1855
1856   sstr = gst_structure_to_string (s);
1857   GST_INFO ("structure: %s", sstr);
1858   g_free (sstr);
1859 }
1860
1861 static GstRTSPStreamTransport *
1862 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1863 {
1864   GstRTSPStreamPrivate *priv = stream->priv;
1865   GList *walk;
1866   GstRTSPStreamTransport *result = NULL;
1867   const gchar *tmp;
1868   gchar *dest;
1869   guint port;
1870
1871   if (rtcp_from == NULL)
1872     return NULL;
1873
1874   tmp = g_strrstr (rtcp_from, ":");
1875   if (tmp == NULL)
1876     return NULL;
1877
1878   port = atoi (tmp + 1);
1879   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1880
1881   g_mutex_lock (&priv->lock);
1882   GST_INFO ("finding %s:%d in %d transports", dest, port,
1883       g_list_length (priv->transports));
1884
1885   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1886     GstRTSPStreamTransport *trans = walk->data;
1887     const GstRTSPTransport *tr;
1888     gint min, max;
1889
1890     tr = gst_rtsp_stream_transport_get_transport (trans);
1891
1892     if (priv->client_side) {
1893       /* In client side mode the 'destination' is the RTSP server, so send
1894        * to those ports */
1895       min = tr->server_port.min;
1896       max = tr->server_port.max;
1897     } else {
1898       min = tr->client_port.min;
1899       max = tr->client_port.max;
1900     }
1901
1902     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1903       result = trans;
1904       break;
1905     }
1906   }
1907   if (result)
1908     g_object_ref (result);
1909   g_mutex_unlock (&priv->lock);
1910
1911   g_free (dest);
1912
1913   return result;
1914 }
1915
1916 static GstRTSPStreamTransport *
1917 check_transport (GObject * source, GstRTSPStream * stream)
1918 {
1919   GstStructure *stats;
1920   GstRTSPStreamTransport *trans;
1921
1922   /* see if we have a stream to match with the origin of the RTCP packet */
1923   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1924   if (trans == NULL) {
1925     g_object_get (source, "stats", &stats, NULL);
1926     if (stats) {
1927       const gchar *rtcp_from;
1928
1929       dump_structure (stats);
1930
1931       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1932       if ((trans = find_transport (stream, rtcp_from))) {
1933         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1934             source);
1935         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1936             g_object_unref);
1937       }
1938       gst_structure_free (stats);
1939     }
1940   }
1941   return trans;
1942 }
1943
1944
1945 static void
1946 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1947 {
1948   GstRTSPStreamTransport *trans;
1949
1950   GST_INFO ("%p: new source %p", stream, source);
1951
1952   trans = check_transport (source, stream);
1953
1954   if (trans)
1955     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1956 }
1957
1958 static void
1959 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1960 {
1961   GST_INFO ("%p: new SDES %p", stream, source);
1962 }
1963
1964 static void
1965 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1966 {
1967   GstRTSPStreamTransport *trans;
1968
1969   trans = check_transport (source, stream);
1970
1971   if (trans) {
1972     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1973     gst_rtsp_stream_transport_keep_alive (trans);
1974   }
1975 #ifdef DUMP_STATS
1976   {
1977     GstStructure *stats;
1978     g_object_get (source, "stats", &stats, NULL);
1979     if (stats) {
1980       dump_structure (stats);
1981       gst_structure_free (stats);
1982     }
1983   }
1984 #endif
1985 }
1986
1987 static void
1988 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1989 {
1990   GST_INFO ("%p: source %p bye", stream, source);
1991 }
1992
1993 static void
1994 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1995 {
1996   GstRTSPStreamTransport *trans;
1997
1998   GST_INFO ("%p: source %p bye timeout", stream, source);
1999
2000   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2001     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2002     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2003   }
2004 }
2005
2006 static void
2007 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2008 {
2009   GstRTSPStreamTransport *trans;
2010
2011   GST_INFO ("%p: source %p timeout", stream, source);
2012
2013   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2014     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2015     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2016   }
2017 }
2018
2019 static void
2020 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2021 {
2022   GST_INFO ("%p: new sender source %p", stream, source);
2023 #ifndef DUMP_STATS
2024   {
2025     GstStructure *stats;
2026     g_object_get (source, "stats", &stats, NULL);
2027     if (stats) {
2028       dump_structure (stats);
2029       gst_structure_free (stats);
2030     }
2031   }
2032 #endif
2033 }
2034
2035 static void
2036 on_sender_ssrc_active (GObject * session, GObject * source,
2037     GstRTSPStream * stream)
2038 {
2039 #ifndef DUMP_STATS
2040   {
2041     GstStructure *stats;
2042     g_object_get (source, "stats", &stats, NULL);
2043     if (stats) {
2044       dump_structure (stats);
2045       gst_structure_free (stats);
2046     }
2047   }
2048 #endif
2049 }
2050
2051 static void
2052 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2053 {
2054   if (is_rtp) {
2055     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2056     g_list_free (priv->tr_cache_rtp);
2057     priv->tr_cache_rtp = NULL;
2058   } else {
2059     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2060     g_list_free (priv->tr_cache_rtcp);
2061     priv->tr_cache_rtcp = NULL;
2062   }
2063 }
2064
2065 static GstFlowReturn
2066 handle_new_sample (GstAppSink * sink, gpointer user_data)
2067 {
2068   GstRTSPStreamPrivate *priv;
2069   GList *walk;
2070   GstSample *sample;
2071   GstBuffer *buffer;
2072   GstRTSPStream *stream;
2073   gboolean is_rtp;
2074
2075   sample = gst_app_sink_pull_sample (sink);
2076   if (!sample)
2077     return GST_FLOW_OK;
2078
2079   stream = (GstRTSPStream *) user_data;
2080   priv = stream->priv;
2081   buffer = gst_sample_get_buffer (sample);
2082
2083   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2084
2085   g_mutex_lock (&priv->lock);
2086   if (is_rtp) {
2087     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2088       clear_tr_cache (priv, is_rtp);
2089       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2090         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2091         priv->tr_cache_rtp =
2092             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2093       }
2094       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2095     }
2096   } else {
2097     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2098       clear_tr_cache (priv, is_rtp);
2099       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2100         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2101         priv->tr_cache_rtcp =
2102             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2103       }
2104       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2105     }
2106   }
2107   g_mutex_unlock (&priv->lock);
2108
2109   if (is_rtp) {
2110     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2111       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2112       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2113     }
2114   } else {
2115     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2116       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2117       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2118     }
2119   }
2120   gst_sample_unref (sample);
2121
2122   return GST_FLOW_OK;
2123 }
2124
2125 static GstAppSinkCallbacks sink_cb = {
2126   NULL,                         /* not interested in EOS */
2127   NULL,                         /* not interested in preroll samples */
2128   handle_new_sample,
2129 };
2130
2131 static GstElement *
2132 get_rtp_encoder (GstRTSPStream * stream, guint session)
2133 {
2134   GstRTSPStreamPrivate *priv = stream->priv;
2135
2136   if (priv->srtpenc == NULL) {
2137     gchar *name;
2138
2139     name = g_strdup_printf ("srtpenc_%u", session);
2140     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2141     g_free (name);
2142
2143     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2144   }
2145   return gst_object_ref (priv->srtpenc);
2146 }
2147
2148 static GstElement *
2149 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2150 {
2151   GstRTSPStreamPrivate *priv = stream->priv;
2152   GstElement *oldenc, *enc;
2153   GstPad *pad;
2154   gchar *name;
2155
2156   if (priv->idx != session)
2157     return NULL;
2158
2159   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2160
2161   oldenc = priv->srtpenc;
2162   enc = get_rtp_encoder (stream, session);
2163   name = g_strdup_printf ("rtp_sink_%d", session);
2164   pad = gst_element_get_request_pad (enc, name);
2165   g_free (name);
2166   gst_object_unref (pad);
2167
2168   if (oldenc == NULL)
2169     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2170         enc);
2171
2172   return enc;
2173 }
2174
2175 static GstElement *
2176 request_rtcp_encoder (GstElement * rtpbin, guint session,
2177     GstRTSPStream * stream)
2178 {
2179   GstRTSPStreamPrivate *priv = stream->priv;
2180   GstElement *oldenc, *enc;
2181   GstPad *pad;
2182   gchar *name;
2183
2184   if (priv->idx != session)
2185     return NULL;
2186
2187   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2188
2189   oldenc = priv->srtpenc;
2190   enc = get_rtp_encoder (stream, session);
2191   name = g_strdup_printf ("rtcp_sink_%d", session);
2192   pad = gst_element_get_request_pad (enc, name);
2193   g_free (name);
2194   gst_object_unref (pad);
2195
2196   if (oldenc == NULL)
2197     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2198         enc);
2199
2200   return enc;
2201 }
2202
2203 static GstCaps *
2204 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2205 {
2206   GstRTSPStreamPrivate *priv = stream->priv;
2207   GstCaps *caps;
2208
2209   GST_DEBUG ("request key %08x", ssrc);
2210
2211   g_mutex_lock (&priv->lock);
2212   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2213     gst_caps_ref (caps);
2214   g_mutex_unlock (&priv->lock);
2215
2216   return caps;
2217 }
2218
2219 static GstElement *
2220 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2221     GstRTSPStream * stream)
2222 {
2223   GstRTSPStreamPrivate *priv = stream->priv;
2224
2225   if (priv->idx != session)
2226     return NULL;
2227
2228   if (priv->srtpdec == NULL) {
2229     gchar *name;
2230
2231     name = g_strdup_printf ("srtpdec_%u", session);
2232     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2233     g_free (name);
2234
2235     g_signal_connect (priv->srtpdec, "request-key",
2236         (GCallback) request_key, stream);
2237   }
2238   return gst_object_ref (priv->srtpdec);
2239 }
2240
2241 /**
2242  * gst_rtsp_stream_request_aux_sender:
2243  * @stream: a #GstRTSPStream
2244  * @sessid: the session id
2245  *
2246  * Creating a rtxsend bin
2247  *
2248  * Returns: (transfer full): a #GstElement.
2249  *
2250  * Since: 1.6
2251  */
2252 GstElement *
2253 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2254 {
2255   GstElement *bin;
2256   GstPad *pad;
2257   GstStructure *pt_map;
2258   gchar *name;
2259   guint pt, rtx_pt;
2260   gchar *pt_s;
2261
2262   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2263
2264   pt = gst_rtsp_stream_get_pt (stream);
2265   pt_s = g_strdup_printf ("%u", pt);
2266   rtx_pt = stream->priv->rtx_pt;
2267
2268   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2269
2270   bin = gst_bin_new (NULL);
2271   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2272   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2273       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2274   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2275       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2276   g_free (pt_s);
2277   gst_structure_free (pt_map);
2278   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2279
2280   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2281   name = g_strdup_printf ("src_%u", sessid);
2282   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2283   g_free (name);
2284   gst_object_unref (pad);
2285
2286   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2287   name = g_strdup_printf ("sink_%u", sessid);
2288   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2289   g_free (name);
2290   gst_object_unref (pad);
2291
2292   return bin;
2293 }
2294
2295 /**
2296  * gst_rtsp_stream_set_pt_map:
2297  * @stream: a #GstRTSPStream
2298  * @pt: the pt
2299  * @caps: a #GstCaps
2300  *
2301  * Configure a pt map between @pt and @caps.
2302  */
2303 void
2304 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2305 {
2306   GstRTSPStreamPrivate *priv = stream->priv;
2307
2308   g_mutex_lock (&priv->lock);
2309   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2310   g_mutex_unlock (&priv->lock);
2311 }
2312
2313 /**
2314  * gst_rtsp_stream_set_publish_clock_mode:
2315  * @stream: a #GstRTSPStream
2316  * @mode: the clock publish mode
2317  *
2318  * Sets if and how the stream clock should be published according to RFC7273.
2319  *
2320  * Since: 1.8
2321  */
2322 void
2323 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2324     GstRTSPPublishClockMode mode)
2325 {
2326   GstRTSPStreamPrivate *priv;
2327
2328   priv = stream->priv;
2329   g_mutex_lock (&priv->lock);
2330   priv->publish_clock_mode = mode;
2331   g_mutex_unlock (&priv->lock);
2332 }
2333
2334 /**
2335  * gst_rtsp_stream_get_publish_clock_mode:
2336  * @factory: a #GstRTSPStream
2337  *
2338  * Gets if and how the stream clock should be published according to RFC7273.
2339  *
2340  * Returns: The GstRTSPPublishClockMode
2341  *
2342  * Since: 1.8
2343  */
2344 GstRTSPPublishClockMode
2345 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2346 {
2347   GstRTSPStreamPrivate *priv;
2348   GstRTSPPublishClockMode ret;
2349
2350   priv = stream->priv;
2351   g_mutex_lock (&priv->lock);
2352   ret = priv->publish_clock_mode;
2353   g_mutex_unlock (&priv->lock);
2354
2355   return ret;
2356 }
2357
2358 static GstCaps *
2359 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2360     GstRTSPStream * stream)
2361 {
2362   GstRTSPStreamPrivate *priv = stream->priv;
2363   GstCaps *caps = NULL;
2364
2365   g_mutex_lock (&priv->lock);
2366
2367   if (priv->idx == session) {
2368     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2369     if (caps) {
2370       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2371       gst_caps_ref (caps);
2372     } else {
2373       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2374     }
2375   }
2376
2377   g_mutex_unlock (&priv->lock);
2378
2379   return caps;
2380 }
2381
2382 static void
2383 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2384 {
2385   GstRTSPStreamPrivate *priv = stream->priv;
2386   gchar *name;
2387   GstPadLinkReturn ret;
2388   guint sessid;
2389
2390   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2391       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2392
2393   name = gst_pad_get_name (pad);
2394   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2395     g_free (name);
2396     return;
2397   }
2398   g_free (name);
2399
2400   if (priv->idx != sessid)
2401     return;
2402
2403   if (gst_pad_is_linked (priv->sinkpad)) {
2404     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2405         GST_DEBUG_PAD_NAME (priv->sinkpad));
2406     return;
2407   }
2408
2409   /* link the RTP pad to the session manager, it should not really fail unless
2410    * this is not really an RTP pad */
2411   ret = gst_pad_link (pad, priv->sinkpad);
2412   if (ret != GST_PAD_LINK_OK)
2413     goto link_failed;
2414   priv->recv_rtp_src = gst_object_ref (pad);
2415
2416   return;
2417
2418 /* ERRORS */
2419 link_failed:
2420   {
2421     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2422         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2423   }
2424 }
2425
2426 static void
2427 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2428     GstRTSPStream * stream)
2429 {
2430   /* TODO: What to do here other than this? */
2431   GST_DEBUG ("Stream %p: Got EOS", stream);
2432   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2433 }
2434
2435 /* must be called with lock */
2436 static gboolean
2437 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2438 {
2439   GstRTSPStreamPrivate *priv;
2440   GstPad *pad, *sinkpad = NULL;
2441   gboolean is_tcp = FALSE, is_udp = FALSE;
2442   gint i;
2443
2444   priv = stream->priv;
2445
2446   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2447   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2448       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2449
2450   if (is_udp && !create_and_configure_udpsinks (stream))
2451     goto no_udp_protocol;
2452
2453   for (i = 0; i < 2; i++) {
2454     GstPad *teepad, *queuepad;
2455     /* For the sender we create this bit of pipeline for both
2456      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2457      * we need to add a queue before appsink and udpsink to make
2458      * the pipeline not block. For the TCP case, we want to pump
2459      * client as fast as possible anyway. This pipeline is used
2460      * when both TCP and UDP are present.
2461      *
2462      * .--------.      .-----.    .---------.    .---------.
2463      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2464      * |       send->sink   src->sink      src->sink       |
2465      * '--------'      |     |    '---------'    '---------'
2466      *                 |     |    .---------.    .---------.
2467      *                 |     |    |  queue  |    | appsink |
2468      *                 |    src->sink      src->sink       |
2469      *                 '-----'    '---------'    '---------'
2470      *
2471      * When only UDP or only TCP is allowed, we skip the tee and queue
2472      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2473      * the session.
2474      */
2475     /* Only link the RTP send src if we're going to send RTP, link
2476      * the RTCP send src always */
2477     if (priv->srcpad || i == 1) {
2478       if (is_udp) {
2479         /* add udpsink */
2480         gst_bin_add (bin, priv->udpsink[i]);
2481         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2482       }
2483
2484       if (is_tcp) {
2485         /* make appsink */
2486         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2487         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2488         gst_bin_add (bin, priv->appsink[i]);
2489         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2490             &sink_cb, stream, NULL);
2491       }
2492
2493       if (is_udp && is_tcp) {
2494         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2495
2496         /* make tee for RTP/RTCP */
2497         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2498         gst_bin_add (bin, priv->tee[i]);
2499
2500         /* and link to rtpbin send pad */
2501         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2502         gst_pad_link (priv->send_src[i], pad);
2503         gst_object_unref (pad);
2504
2505         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2506         g_object_set (priv->udpqueue[i], "max-size-buffers",
2507             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2508             NULL);
2509         gst_bin_add (bin, priv->udpqueue[i]);
2510         /* link tee to udpqueue */
2511         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2512         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2513         gst_pad_link (teepad, pad);
2514         gst_object_unref (pad);
2515         gst_object_unref (teepad);
2516
2517         /* link udpqueue to udpsink */
2518         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2519         gst_pad_link (queuepad, sinkpad);
2520         gst_object_unref (queuepad);
2521         gst_object_unref (sinkpad);
2522
2523         /* make appqueue */
2524         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2525         g_object_set (priv->appqueue[i], "max-size-buffers",
2526             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2527             NULL);
2528         gst_bin_add (bin, priv->appqueue[i]);
2529         /* and link tee to appqueue */
2530         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2531         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2532         gst_pad_link (teepad, pad);
2533         gst_object_unref (pad);
2534         gst_object_unref (teepad);
2535
2536         /* and link appqueue to appsink */
2537         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2538         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2539         gst_pad_link (queuepad, pad);
2540         gst_object_unref (pad);
2541         gst_object_unref (queuepad);
2542       } else if (is_tcp) {
2543         /* only appsink needed, link it to the session */
2544         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2545         gst_pad_link (priv->send_src[i], pad);
2546         gst_object_unref (pad);
2547
2548         /* when its only TCP, we need to set sync and preroll to FALSE
2549          * for the sink to avoid deadlock. And this is only needed for
2550          * sink used for RTCP data, not the RTP data. */
2551         if (i == 1)
2552           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2553       } else {
2554         /* else only udpsink needed, link it to the session */
2555         gst_pad_link (priv->send_src[i], sinkpad);
2556         gst_object_unref (sinkpad);
2557       }
2558     }
2559
2560     /* check if we need to set to a special state */
2561     if (state != GST_STATE_NULL) {
2562       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2563         gst_element_set_state (priv->udpsink[i], state);
2564       if (priv->appsink[i] && (priv->srcpad || i == 1))
2565         gst_element_set_state (priv->appsink[i], state);
2566       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2567         gst_element_set_state (priv->appqueue[i], state);
2568       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2569         gst_element_set_state (priv->udpqueue[i], state);
2570       if (priv->tee[i] && (priv->srcpad || i == 1))
2571         gst_element_set_state (priv->tee[i], state);
2572     }
2573   }
2574
2575   return TRUE;
2576
2577   /* ERRORS */
2578 no_udp_protocol:
2579   {
2580     return FALSE;
2581   }
2582 }
2583
2584 /* must be called with lock */
2585 static void
2586 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2587 {
2588   GstRTSPStreamPrivate *priv;
2589   GstPad *pad, *selpad;
2590   gboolean is_tcp;
2591   gint i;
2592
2593   priv = stream->priv;
2594
2595   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2596
2597   for (i = 0; i < 2; i++) {
2598     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2599      * RTCP sink always */
2600     if (priv->sinkpad || i == 1) {
2601       /* For the receiver we create this bit of pipeline for both
2602        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2603        * and it is all funneled into the rtpbin receive pad.
2604        *
2605        * .--------.     .--------.    .--------.
2606        * | udpsrc |     | funnel |    | rtpbin |
2607        * |       src->sink      src->sink      |
2608        * '--------'     |        |    '--------'
2609        * .--------.     |        |
2610        * | appsrc |     |        |
2611        * |       src->sink       |
2612        * '--------'     '--------'
2613        */
2614       /* make funnel for the RTP/RTCP receivers */
2615       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2616       gst_bin_add (bin, priv->funnel[i]);
2617
2618       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2619       gst_pad_link (pad, priv->recv_sink[i]);
2620       gst_object_unref (pad);
2621
2622       if (is_tcp) {
2623         /* make and add appsrc */
2624         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2625         priv->appsrc_base_time[i] = -1;
2626         if (priv->srcpad) {
2627           gst_element_set_state (priv->appsrc[i], GST_STATE_PLAYING);
2628           gst_element_set_locked_state (priv->appsrc[i], TRUE);
2629         }
2630         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2631             TRUE, NULL);
2632         gst_bin_add (bin, priv->appsrc[i]);
2633         /* and link to the funnel */
2634         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2635         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2636         gst_pad_link (pad, selpad);
2637         gst_object_unref (pad);
2638         gst_object_unref (selpad);
2639       }
2640     }
2641
2642     /* check if we need to set to a special state */
2643     if (state != GST_STATE_NULL) {
2644       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2645         gst_element_set_state (priv->funnel[i], state);
2646     }
2647   }
2648 }
2649
2650 /**
2651  * gst_rtsp_stream_join_bin:
2652  * @stream: a #GstRTSPStream
2653  * @bin: (transfer none): a #GstBin to join
2654  * @rtpbin: (transfer none): a rtpbin element in @bin
2655  * @state: the target state of the new elements
2656  *
2657  * Join the #GstBin @bin that contains the element @rtpbin.
2658  *
2659  * @stream will link to @rtpbin, which must be inside @bin. The elements
2660  * added to @bin will be set to the state given in @state.
2661  *
2662  * Returns: %TRUE on success.
2663  */
2664 gboolean
2665 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2666     GstElement * rtpbin, GstState state)
2667 {
2668   GstRTSPStreamPrivate *priv;
2669   guint idx;
2670   gchar *name;
2671   GstPadLinkReturn ret;
2672
2673   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2674   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2675   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2676
2677   priv = stream->priv;
2678
2679   g_mutex_lock (&priv->lock);
2680   if (priv->is_joined)
2681     goto was_joined;
2682
2683   /* create a session with the same index as the stream */
2684   idx = priv->idx;
2685
2686   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2687
2688   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2689       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2690     /* For SRTP */
2691     g_signal_connect (rtpbin, "request-rtp-encoder",
2692         (GCallback) request_rtp_encoder, stream);
2693     g_signal_connect (rtpbin, "request-rtcp-encoder",
2694         (GCallback) request_rtcp_encoder, stream);
2695     g_signal_connect (rtpbin, "request-rtp-decoder",
2696         (GCallback) request_rtp_rtcp_decoder, stream);
2697     g_signal_connect (rtpbin, "request-rtcp-decoder",
2698         (GCallback) request_rtp_rtcp_decoder, stream);
2699   }
2700
2701   if (priv->sinkpad) {
2702     g_signal_connect (rtpbin, "request-pt-map",
2703         (GCallback) request_pt_map, stream);
2704   }
2705
2706   /* get pads from the RTP session element for sending and receiving
2707    * RTP/RTCP*/
2708   if (priv->srcpad) {
2709     /* get a pad for sending RTP */
2710     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2711     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2712     g_free (name);
2713
2714     /* link the RTP pad to the session manager, it should not really fail unless
2715      * this is not really an RTP pad */
2716     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2717     if (ret != GST_PAD_LINK_OK)
2718       goto link_failed;
2719
2720     name = g_strdup_printf ("send_rtp_src_%u", idx);
2721     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2722     g_free (name);
2723   } else {
2724     /* Need to connect our sinkpad from here */
2725     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2726     /* EOS */
2727     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2728
2729     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2730     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2731     g_free (name);
2732   }
2733
2734   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2735   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2736   g_free (name);
2737   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2738   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2739   g_free (name);
2740
2741   /* get the session */
2742   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2743
2744   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2745       stream);
2746   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2747       stream);
2748   g_signal_connect (priv->session, "on-ssrc-active",
2749       (GCallback) on_ssrc_active, stream);
2750   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2751       stream);
2752   g_signal_connect (priv->session, "on-bye-timeout",
2753       (GCallback) on_bye_timeout, stream);
2754   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2755       stream);
2756
2757   /* signal for sender ssrc */
2758   g_signal_connect (priv->session, "on-new-sender-ssrc",
2759       (GCallback) on_new_sender_ssrc, stream);
2760   g_signal_connect (priv->session, "on-sender-ssrc-active",
2761       (GCallback) on_sender_ssrc_active, stream);
2762
2763   if (!create_sender_part (stream, bin, state))
2764     goto no_udp_protocol;
2765
2766   create_receiver_part (stream, bin, state);
2767
2768   if (priv->srcpad) {
2769     /* be notified of caps changes */
2770     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2771         (GCallback) caps_notify, stream);
2772   }
2773
2774   priv->joined_bin = bin;
2775   priv->is_joined = TRUE;
2776   g_mutex_unlock (&priv->lock);
2777
2778   return TRUE;
2779
2780   /* ERRORS */
2781 was_joined:
2782   {
2783     g_mutex_unlock (&priv->lock);
2784     return TRUE;
2785   }
2786 link_failed:
2787   {
2788     GST_WARNING ("failed to link stream %u", idx);
2789     gst_object_unref (priv->send_rtp_sink);
2790     priv->send_rtp_sink = NULL;
2791     g_mutex_unlock (&priv->lock);
2792     return FALSE;
2793   }
2794 no_udp_protocol:
2795   {
2796     GST_WARNING ("failed to allocate ports %u", idx);
2797     gst_object_unref (priv->send_rtp_sink);
2798     priv->send_rtp_sink = NULL;
2799     gst_object_unref (priv->send_src[0]);
2800     priv->send_src[0] = NULL;
2801     gst_object_unref (priv->send_src[1]);
2802     priv->send_src[1] = NULL;
2803     gst_object_unref (priv->recv_sink[0]);
2804     priv->recv_sink[0] = NULL;
2805     gst_object_unref (priv->recv_sink[1]);
2806     priv->recv_sink[1] = NULL;
2807     if (priv->udpsink[0])
2808       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2809     if (priv->udpsink[1])
2810       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2811
2812     g_mutex_unlock (&priv->lock);
2813     return FALSE;
2814   }
2815 }
2816
2817 /* Must be called with priv->lock. */
2818 static void
2819 remove_all_unicast_udpsrcs (GstRTSPStream * stream, GstBin * bin)
2820 {
2821   GstRTSPStreamPrivate *priv;
2822   GHashTableIter iter;
2823   gpointer iter_key, iter_value;
2824
2825   priv = stream->priv;
2826
2827   /* Remove all of the unicast udpsrcs */
2828   g_hash_table_iter_init (&iter, priv->udpsrcs);
2829   while (g_hash_table_iter_next (&iter, &iter_key, &iter_value)) {
2830     GstRTSPStreamUDPSrcs *transport_udpsrcs =
2831         (GstRTSPStreamUDPSrcs *) iter_value;
2832
2833     for (int i = 0; i < 2; i++) {
2834       if (transport_udpsrcs->udpsrc[i]) {
2835         if (priv->sinkpad || i == 1) {
2836           /* Set udpsrc to NULL now before removing */
2837           gst_element_set_locked_state (transport_udpsrcs->udpsrc[i], FALSE);
2838           gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
2839
2840           /* removing them should also nicely release the request
2841            * pads when they finalize */
2842           gst_bin_remove (bin, transport_udpsrcs->udpsrc[i]);
2843         } else {
2844           /* we need to set the state to NULL before unref */
2845           gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
2846           gst_object_unref (transport_udpsrcs->udpsrc[i]);
2847         }
2848       }
2849     }
2850   }
2851
2852   g_hash_table_remove_all (priv->udpsrcs);
2853 }
2854
2855 /**
2856  * gst_rtsp_stream_leave_bin:
2857  * @stream: a #GstRTSPStream
2858  * @bin: (transfer none): a #GstBin
2859  * @rtpbin: (transfer none): a rtpbin #GstElement
2860  *
2861  * Remove the elements of @stream from @bin.
2862  *
2863  * Return: %TRUE on success.
2864  */
2865 gboolean
2866 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2867     GstElement * rtpbin)
2868 {
2869   GstRTSPStreamPrivate *priv;
2870   gint i;
2871   gboolean is_tcp, is_udp;
2872
2873   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2874   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2875   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2876
2877   priv = stream->priv;
2878
2879   g_mutex_lock (&priv->lock);
2880   if (!priv->is_joined)
2881     goto was_not_joined;
2882
2883   priv->joined_bin = NULL;
2884
2885   /* all transports must be removed by now */
2886   if (priv->transports != NULL)
2887     goto transports_not_removed;
2888
2889   clear_tr_cache (priv, TRUE);
2890   clear_tr_cache (priv, FALSE);
2891
2892   GST_INFO ("stream %p leaving bin", stream);
2893
2894   if (priv->srcpad) {
2895     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2896
2897     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2898     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2899     gst_object_unref (priv->send_rtp_sink);
2900     priv->send_rtp_sink = NULL;
2901   } else if (priv->recv_rtp_src) {
2902     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2903     gst_object_unref (priv->recv_rtp_src);
2904     priv->recv_rtp_src = NULL;
2905   }
2906
2907   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2908
2909   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2910       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2911
2912   remove_all_unicast_udpsrcs (stream, bin);
2913
2914   for (i = 0; i < 2; i++) {
2915     if (priv->udpsink[i])
2916       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2917     if (priv->appsink[i])
2918       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2919     if (priv->appqueue[i])
2920       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2921     if (priv->udpqueue[i])
2922       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2923     if (priv->tee[i])
2924       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2925     if (priv->funnel[i])
2926       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2927     if (priv->appsrc[i])
2928       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2929
2930     if (priv->udpsrc_mcast_v4[i]) {
2931       if (priv->sinkpad || i == 1) {
2932         /* and set udpsrc to NULL now before removing */
2933         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2934         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2935         /* removing them should also nicely release the request
2936          * pads when they finalize */
2937         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2938       } else {
2939         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2940         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2941       }
2942     }
2943
2944     if (priv->udpsrc_mcast_v6[i]) {
2945       if (priv->sinkpad || i == 1) {
2946         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
2947         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2948         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
2949       } else {
2950         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2951         gst_object_unref (priv->udpsrc_mcast_v6[i]);
2952       }
2953     }
2954
2955     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2956       gst_bin_remove (bin, priv->udpsink[i]);
2957     if (priv->appsrc[i]) {
2958       if (priv->sinkpad || i == 1) {
2959         gst_element_set_locked_state (priv->appsrc[i], FALSE);
2960         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2961         gst_bin_remove (bin, priv->appsrc[i]);
2962       } else {
2963         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2964         gst_object_unref (priv->appsrc[i]);
2965       }
2966     }
2967     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2968       gst_bin_remove (bin, priv->appsink[i]);
2969     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2970       gst_bin_remove (bin, priv->appqueue[i]);
2971     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2972       gst_bin_remove (bin, priv->udpqueue[i]);
2973     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2974       gst_bin_remove (bin, priv->tee[i]);
2975     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2976       gst_bin_remove (bin, priv->funnel[i]);
2977
2978     if (priv->sinkpad || i == 1) {
2979       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2980       gst_object_unref (priv->recv_sink[i]);
2981       priv->recv_sink[i] = NULL;
2982     }
2983
2984     priv->udpsrc_mcast_v4[i] = NULL;
2985     priv->udpsrc_mcast_v6[i] = NULL;
2986     priv->udpsink[i] = NULL;
2987     priv->appsrc[i] = NULL;
2988     priv->appsink[i] = NULL;
2989     priv->appqueue[i] = NULL;
2990     priv->udpqueue[i] = NULL;
2991     priv->tee[i] = NULL;
2992     priv->funnel[i] = NULL;
2993   }
2994
2995   if (priv->srcpad) {
2996     gst_object_unref (priv->send_src[0]);
2997     priv->send_src[0] = NULL;
2998   }
2999
3000   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3001   gst_object_unref (priv->send_src[1]);
3002   priv->send_src[1] = NULL;
3003
3004   g_object_unref (priv->session);
3005   priv->session = NULL;
3006   if (priv->caps)
3007     gst_caps_unref (priv->caps);
3008   priv->caps = NULL;
3009
3010   if (priv->srtpenc)
3011     gst_object_unref (priv->srtpenc);
3012   if (priv->srtpdec)
3013     gst_object_unref (priv->srtpdec);
3014
3015   priv->is_joined = FALSE;
3016   g_mutex_unlock (&priv->lock);
3017
3018   return TRUE;
3019
3020 was_not_joined:
3021   {
3022     g_mutex_unlock (&priv->lock);
3023     return TRUE;
3024   }
3025 transports_not_removed:
3026   {
3027     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3028     g_mutex_unlock (&priv->lock);
3029     return FALSE;
3030   }
3031 }
3032
3033 /**
3034  * gst_rtsp_stream_get_joined_bin:
3035  * @stream: a #GstRTSPStream
3036  *
3037  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3038  *
3039  * Return: (transfer full): the joined bin or NULL.
3040  */
3041 GstBin *
3042 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3043 {
3044   GstRTSPStreamPrivate *priv;
3045   GstBin *bin = NULL;
3046
3047   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3048
3049   priv = stream->priv;
3050
3051   g_mutex_lock (&priv->lock);
3052   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3053   g_mutex_unlock (&priv->lock);
3054
3055   return bin;
3056 }
3057
3058 /**
3059  * gst_rtsp_stream_get_rtpinfo:
3060  * @stream: a #GstRTSPStream
3061  * @rtptime: (allow-none): result RTP timestamp
3062  * @seq: (allow-none): result RTP seqnum
3063  * @clock_rate: (allow-none): the clock rate
3064  * @running_time: (allow-none): result running-time
3065  *
3066  * Retrieve the current rtptime, seq and running-time. This is used to
3067  * construct a RTPInfo reply header.
3068  *
3069  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3070  */
3071 gboolean
3072 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3073     guint * rtptime, guint * seq, guint * clock_rate,
3074     GstClockTime * running_time)
3075 {
3076   GstRTSPStreamPrivate *priv;
3077   GstStructure *stats;
3078   GObjectClass *payobjclass;
3079
3080   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3081
3082   priv = stream->priv;
3083
3084   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3085
3086   g_mutex_lock (&priv->lock);
3087
3088   /* First try to extract the information from the last buffer on the sinks.
3089    * This will have a more accurate sequence number and timestamp, as between
3090    * the payloader and the sink there can be some queues
3091    */
3092   if (priv->udpsink[0] || priv->appsink[0]) {
3093     GstSample *last_sample;
3094
3095     if (priv->udpsink[0])
3096       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3097     else
3098       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3099
3100     if (last_sample) {
3101       GstCaps *caps;
3102       GstBuffer *buffer;
3103       GstSegment *segment;
3104       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3105
3106       caps = gst_sample_get_caps (last_sample);
3107       buffer = gst_sample_get_buffer (last_sample);
3108       segment = gst_sample_get_segment (last_sample);
3109
3110       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3111         if (seq) {
3112           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3113         }
3114
3115         if (rtptime) {
3116           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3117         }
3118
3119         gst_rtp_buffer_unmap (&rtp_buffer);
3120
3121         if (running_time) {
3122           *running_time =
3123               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3124               GST_BUFFER_TIMESTAMP (buffer));
3125         }
3126
3127         if (clock_rate) {
3128           GstStructure *s = gst_caps_get_structure (caps, 0);
3129
3130           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3131
3132           if (*clock_rate == 0 && running_time)
3133             *running_time = GST_CLOCK_TIME_NONE;
3134         }
3135         gst_sample_unref (last_sample);
3136
3137         goto done;
3138       } else {
3139         gst_sample_unref (last_sample);
3140       }
3141     }
3142   }
3143
3144   if (g_object_class_find_property (payobjclass, "stats")) {
3145     g_object_get (priv->payloader, "stats", &stats, NULL);
3146     if (stats == NULL)
3147       goto no_stats;
3148
3149     if (seq)
3150       gst_structure_get_uint (stats, "seqnum", seq);
3151
3152     if (rtptime)
3153       gst_structure_get_uint (stats, "timestamp", rtptime);
3154
3155     if (running_time)
3156       gst_structure_get_clock_time (stats, "running-time", running_time);
3157
3158     if (clock_rate) {
3159       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3160       if (*clock_rate == 0 && running_time)
3161         *running_time = GST_CLOCK_TIME_NONE;
3162     }
3163     gst_structure_free (stats);
3164   } else {
3165     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3166         !g_object_class_find_property (payobjclass, "timestamp"))
3167       goto no_stats;
3168
3169     if (seq)
3170       g_object_get (priv->payloader, "seqnum", seq, NULL);
3171
3172     if (rtptime)
3173       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3174
3175     if (running_time)
3176       *running_time = GST_CLOCK_TIME_NONE;
3177   }
3178
3179 done:
3180   g_mutex_unlock (&priv->lock);
3181
3182   return TRUE;
3183
3184   /* ERRORS */
3185 no_stats:
3186   {
3187     GST_WARNING ("Could not get payloader stats");
3188     g_mutex_unlock (&priv->lock);
3189     return FALSE;
3190   }
3191 }
3192
3193 /**
3194  * gst_rtsp_stream_get_caps:
3195  * @stream: a #GstRTSPStream
3196  *
3197  * Retrieve the current caps of @stream.
3198  *
3199  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3200  * after usage.
3201  */
3202 GstCaps *
3203 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3204 {
3205   GstRTSPStreamPrivate *priv;
3206   GstCaps *result;
3207
3208   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3209
3210   priv = stream->priv;
3211
3212   g_mutex_lock (&priv->lock);
3213   if ((result = priv->caps))
3214     gst_caps_ref (result);
3215   g_mutex_unlock (&priv->lock);
3216
3217   return result;
3218 }
3219
3220 /**
3221  * gst_rtsp_stream_recv_rtp:
3222  * @stream: a #GstRTSPStream
3223  * @buffer: (transfer full): a #GstBuffer
3224  *
3225  * Handle an RTP buffer for the stream. This method is usually called when a
3226  * message has been received from a client using the TCP transport.
3227  *
3228  * This function takes ownership of @buffer.
3229  *
3230  * Returns: a GstFlowReturn.
3231  */
3232 GstFlowReturn
3233 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3234 {
3235   GstRTSPStreamPrivate *priv;
3236   GstFlowReturn ret;
3237   GstElement *element;
3238
3239   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3240   priv = stream->priv;
3241   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3242   g_return_val_if_fail (priv->is_joined, FALSE);
3243
3244   g_mutex_lock (&priv->lock);
3245   if (priv->appsrc[0])
3246     element = gst_object_ref (priv->appsrc[0]);
3247   else
3248     element = NULL;
3249   g_mutex_unlock (&priv->lock);
3250
3251   if (element) {
3252     if (priv->appsrc_base_time[0] == -1) {
3253       /* Take current running_time. This timestamp will be put on
3254        * the first buffer of each stream because we are a live source and so we
3255        * timestamp with the running_time. When we are dealing with TCP, we also
3256        * only timestamp the first buffer (using the DISCONT flag) because a server
3257        * typically bursts data, for which we don't want to compensate by speeding
3258        * up the media. The other timestamps will be interpollated from this one
3259        * using the RTP timestamps. */
3260       GST_OBJECT_LOCK (element);
3261       if (GST_ELEMENT_CLOCK (element)) {
3262         GstClockTime now;
3263         GstClockTime base_time;
3264
3265         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3266         base_time = GST_ELEMENT_CAST (element)->base_time;
3267
3268         priv->appsrc_base_time[0] = now - base_time;
3269         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3270         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3271             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3272             GST_TIME_ARGS (base_time));
3273       }
3274       GST_OBJECT_UNLOCK (element);
3275     }
3276
3277     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3278     gst_object_unref (element);
3279   } else {
3280     ret = GST_FLOW_OK;
3281   }
3282   return ret;
3283 }
3284
3285 /**
3286  * gst_rtsp_stream_recv_rtcp:
3287  * @stream: a #GstRTSPStream
3288  * @buffer: (transfer full): a #GstBuffer
3289  *
3290  * Handle an RTCP buffer for the stream. This method is usually called when a
3291  * message has been received from a client using the TCP transport.
3292  *
3293  * This function takes ownership of @buffer.
3294  *
3295  * Returns: a GstFlowReturn.
3296  */
3297 GstFlowReturn
3298 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3299 {
3300   GstRTSPStreamPrivate *priv;
3301   GstFlowReturn ret;
3302   GstElement *element;
3303
3304   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3305   priv = stream->priv;
3306   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3307
3308   if (!priv->is_joined) {
3309     gst_buffer_unref (buffer);
3310     return GST_FLOW_NOT_LINKED;
3311   }
3312   g_mutex_lock (&priv->lock);
3313   if (priv->appsrc[1])
3314     element = gst_object_ref (priv->appsrc[1]);
3315   else
3316     element = NULL;
3317   g_mutex_unlock (&priv->lock);
3318
3319   if (element) {
3320     if (priv->appsrc_base_time[1] == -1) {
3321       /* Take current running_time. This timestamp will be put on
3322        * the first buffer of each stream because we are a live source and so we
3323        * timestamp with the running_time. When we are dealing with TCP, we also
3324        * only timestamp the first buffer (using the DISCONT flag) because a server
3325        * typically bursts data, for which we don't want to compensate by speeding
3326        * up the media. The other timestamps will be interpollated from this one
3327        * using the RTP timestamps. */
3328       GST_OBJECT_LOCK (element);
3329       if (GST_ELEMENT_CLOCK (element)) {
3330         GstClockTime now;
3331         GstClockTime base_time;
3332
3333         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3334         base_time = GST_ELEMENT_CAST (element)->base_time;
3335
3336         priv->appsrc_base_time[1] = now - base_time;
3337         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3338         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3339             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3340             GST_TIME_ARGS (base_time));
3341       }
3342       GST_OBJECT_UNLOCK (element);
3343     }
3344
3345     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3346     gst_object_unref (element);
3347   } else {
3348     ret = GST_FLOW_OK;
3349     gst_buffer_unref (buffer);
3350   }
3351   return ret;
3352 }
3353
3354 /* Properly dispose udpsrcs that were created for a given transport. */
3355 /* Must be called with priv->lock. */
3356 static void
3357 remove_transport_udpsrcs (GstRTSPStreamPrivate * priv,
3358     const GstRTSPTransport * tr)
3359 {
3360   /* Remove the udpsrcs associated with this transport. */
3361   GstRTSPStreamUDPSrcs *transport_udpsrcs =
3362       g_hash_table_lookup (priv->udpsrcs, tr);
3363   if (transport_udpsrcs != NULL) {
3364     for (int i = 0; i < 2; i++) {
3365       if (transport_udpsrcs->udpsrc[i]) {
3366         if (priv->sinkpad || i == 1) {
3367           GstBin *bin;
3368           GstPad *udpsrc_srcpad, *funnel_sinkpad;
3369
3370           /* We know these udpsrcs are all linked to funnels. Explicitely 
3371            * get the funnel src pads so we can properly release them. */
3372           udpsrc_srcpad =
3373               gst_element_get_static_pad (transport_udpsrcs->udpsrc[i], "src");
3374           funnel_sinkpad = gst_pad_get_peer (udpsrc_srcpad);
3375
3376           if (funnel_sinkpad != NULL) {
3377             /* Unlink pads and release funnel's request pad. */
3378             gst_pad_unlink (udpsrc_srcpad, funnel_sinkpad);
3379             gst_element_release_request_pad (priv->funnel[i], funnel_sinkpad);
3380             gst_object_unref (funnel_sinkpad);
3381           }
3382           gst_object_unref (udpsrc_srcpad);
3383
3384           /* Set udpsrc to NULL now before removing */
3385           gst_element_set_locked_state (transport_udpsrcs->udpsrc[i], FALSE);
3386           gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
3387
3388           /* This udpsrc is expected to be owned by a bin. Get the bin and 
3389            * remove our element. */
3390           bin = GST_BIN (gst_element_get_parent (transport_udpsrcs->udpsrc[i]));
3391           if (bin != NULL) {
3392             gst_bin_remove (bin, transport_udpsrcs->udpsrc[i]);
3393             gst_object_unref (bin);
3394           } else {
3395             GST_ERROR ("Expected this udpsrc element to be part of a bin.");
3396             gst_object_unref (transport_udpsrcs->udpsrc[i]);
3397           }
3398
3399         } else {
3400           /* we need to set the state to NULL before unref */
3401           gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
3402           gst_object_unref (transport_udpsrcs->udpsrc[i]);
3403         }
3404       }
3405     }
3406
3407     /* The udpsrcs are now properly cleaned up. Remove them from the table */
3408     g_hash_table_remove (priv->udpsrcs, tr);
3409
3410   } else {
3411     /* This can happen if we're dealing with a multicast transport. */
3412     GST_INFO ("Could not find udpsrcs associated with this transport.");
3413   }
3414 }
3415
3416 /* must be called with lock */
3417 static gboolean
3418 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3419     gboolean add)
3420 {
3421   GstRTSPStreamPrivate *priv = stream->priv;
3422   const GstRTSPTransport *tr;
3423
3424   tr = gst_rtsp_stream_transport_get_transport (trans);
3425
3426   switch (tr->lower_transport) {
3427     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3428     case GST_RTSP_LOWER_TRANS_UDP:
3429     {
3430       gchar *dest;
3431       gint min, max;
3432       guint ttl = 0;
3433
3434       dest = tr->destination;
3435       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3436         min = tr->port.min;
3437         max = tr->port.max;
3438         ttl = tr->ttl;
3439       } else if (priv->client_side) {
3440         /* In client side mode the 'destination' is the RTSP server, so send
3441          * to those ports */
3442         min = tr->server_port.min;
3443         max = tr->server_port.max;
3444       } else {
3445         min = tr->client_port.min;
3446         max = tr->client_port.max;
3447       }
3448
3449       if (add) {
3450         if (ttl > 0) {
3451           GST_INFO ("setting ttl-mc %d", ttl);
3452           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3453           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3454         }
3455         GST_INFO ("adding %s:%d-%d", dest, min, max);
3456         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3457         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3458         priv->transports = g_list_prepend (priv->transports, trans);
3459       } else {
3460         GST_INFO ("removing %s:%d-%d", dest, min, max);
3461         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3462         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3463         priv->transports = g_list_remove (priv->transports, trans);
3464
3465         remove_transport_udpsrcs (priv, tr);
3466       }
3467       priv->transports_cookie++;
3468       break;
3469     }
3470     case GST_RTSP_LOWER_TRANS_TCP:
3471       if (add) {
3472         GST_INFO ("adding TCP %s", tr->destination);
3473         priv->transports = g_list_prepend (priv->transports, trans);
3474       } else {
3475         GST_INFO ("removing TCP %s", tr->destination);
3476         priv->transports = g_list_remove (priv->transports, trans);
3477       }
3478       priv->transports_cookie++;
3479       break;
3480     default:
3481       goto unknown_transport;
3482   }
3483   return TRUE;
3484
3485   /* ERRORS */
3486 unknown_transport:
3487   {
3488     GST_INFO ("Unknown transport %d", tr->lower_transport);
3489     return FALSE;
3490   }
3491 }
3492
3493
3494 /**
3495  * gst_rtsp_stream_add_transport:
3496  * @stream: a #GstRTSPStream
3497  * @trans: (transfer none): a #GstRTSPStreamTransport
3498  *
3499  * Add the transport in @trans to @stream. The media of @stream will
3500  * then also be send to the values configured in @trans.
3501  *
3502  * @stream must be joined to a bin.
3503  *
3504  * @trans must contain a valid #GstRTSPTransport.
3505  *
3506  * Returns: %TRUE if @trans was added
3507  */
3508 gboolean
3509 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3510     GstRTSPStreamTransport * trans)
3511 {
3512   GstRTSPStreamPrivate *priv;
3513   gboolean res;
3514
3515   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3516   priv = stream->priv;
3517   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3518   g_return_val_if_fail (priv->is_joined, FALSE);
3519
3520   g_mutex_lock (&priv->lock);
3521   res = update_transport (stream, trans, TRUE);
3522   g_mutex_unlock (&priv->lock);
3523
3524   return res;
3525 }
3526
3527 /**
3528  * gst_rtsp_stream_remove_transport:
3529  * @stream: a #GstRTSPStream
3530  * @trans: (transfer none): a #GstRTSPStreamTransport
3531  *
3532  * Remove the transport in @trans from @stream. The media of @stream will
3533  * not be sent to the values configured in @trans.
3534  *
3535  * @stream must be joined to a bin.
3536  *
3537  * @trans must contain a valid #GstRTSPTransport.
3538  *
3539  * Returns: %TRUE if @trans was removed
3540  */
3541 gboolean
3542 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3543     GstRTSPStreamTransport * trans)
3544 {
3545   GstRTSPStreamPrivate *priv;
3546   gboolean res;
3547
3548   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3549   priv = stream->priv;
3550   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3551   g_return_val_if_fail (priv->is_joined, FALSE);
3552
3553   g_mutex_lock (&priv->lock);
3554   res = update_transport (stream, trans, FALSE);
3555   g_mutex_unlock (&priv->lock);
3556
3557   return res;
3558 }
3559
3560 /**
3561  * gst_rtsp_stream_update_crypto:
3562  * @stream: a #GstRTSPStream
3563  * @ssrc: the SSRC
3564  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3565  *
3566  * Update the new crypto information for @ssrc in @stream. If information
3567  * for @ssrc did not exist, it will be added. If information
3568  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3569  * be removed from @stream.
3570  *
3571  * Returns: %TRUE if @crypto could be updated
3572  */
3573 gboolean
3574 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3575     guint ssrc, GstCaps * crypto)
3576 {
3577   GstRTSPStreamPrivate *priv;
3578
3579   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3580   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3581
3582   priv = stream->priv;
3583
3584   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3585
3586   g_mutex_lock (&priv->lock);
3587   if (crypto)
3588     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3589         gst_caps_ref (crypto));
3590   else
3591     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3592   g_mutex_unlock (&priv->lock);
3593
3594   return TRUE;
3595 }
3596
3597 /**
3598  * gst_rtsp_stream_get_rtp_socket:
3599  * @stream: a #GstRTSPStream
3600  * @family: the socket family
3601  *
3602  * Get the RTP socket from @stream for a @family.
3603  *
3604  * @stream must be joined to a bin.
3605  *
3606  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3607  * socket could be allocated for @family. Unref after usage
3608  */
3609 GSocket *
3610 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3611 {
3612   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3613   GSocket *socket;
3614   const gchar *name;
3615
3616   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3617   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3618       family == G_SOCKET_FAMILY_IPV6, NULL);
3619   g_return_val_if_fail (priv->udpsink[0], NULL);
3620
3621   if (family == G_SOCKET_FAMILY_IPV6)
3622     name = "socket-v6";
3623   else
3624     name = "socket";
3625
3626   g_object_get (priv->udpsink[0], name, &socket, NULL);
3627
3628   return socket;
3629 }
3630
3631 /**
3632  * gst_rtsp_stream_get_rtcp_socket:
3633  * @stream: a #GstRTSPStream
3634  * @family: the socket family
3635  *
3636  * Get the RTCP socket from @stream for a @family.
3637  *
3638  * @stream must be joined to a bin.
3639  *
3640  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3641  * socket could be allocated for @family. Unref after usage
3642  */
3643 GSocket *
3644 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3645 {
3646   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3647   GSocket *socket;
3648   const gchar *name;
3649
3650   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3651   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3652       family == G_SOCKET_FAMILY_IPV6, NULL);
3653   g_return_val_if_fail (priv->udpsink[1], NULL);
3654
3655   if (family == G_SOCKET_FAMILY_IPV6)
3656     name = "socket-v6";
3657   else
3658     name = "socket";
3659
3660   g_object_get (priv->udpsink[1], name, &socket, NULL);
3661
3662   return socket;
3663 }
3664
3665 /**
3666  * gst_rtsp_stream_set_seqnum:
3667  * @stream: a #GstRTSPStream
3668  * @seqnum: a new sequence number
3669  *
3670  * Configure the sequence number in the payloader of @stream to @seqnum.
3671  */
3672 void
3673 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3674 {
3675   GstRTSPStreamPrivate *priv;
3676
3677   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3678
3679   priv = stream->priv;
3680
3681   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3682 }
3683
3684 /**
3685  * gst_rtsp_stream_get_seqnum:
3686  * @stream: a #GstRTSPStream
3687  *
3688  * Get the configured sequence number in the payloader of @stream.
3689  *
3690  * Returns: the sequence number of the payloader.
3691  */
3692 guint16
3693 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3694 {
3695   GstRTSPStreamPrivate *priv;
3696   guint seqnum;
3697
3698   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3699
3700   priv = stream->priv;
3701
3702   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3703
3704   return seqnum;
3705 }
3706
3707 /**
3708  * gst_rtsp_stream_transport_filter:
3709  * @stream: a #GstRTSPStream
3710  * @func: (scope call) (allow-none): a callback
3711  * @user_data: (closure): user data passed to @func
3712  *
3713  * Call @func for each transport managed by @stream. The result value of @func
3714  * determines what happens to the transport. @func will be called with @stream
3715  * locked so no further actions on @stream can be performed from @func.
3716  *
3717  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3718  * @stream.
3719  *
3720  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3721  *
3722  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3723  * will also be added with an additional ref to the result #GList of this
3724  * function..
3725  *
3726  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3727  *
3728  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3729  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3730  * element in the #GList should be unreffed before the list is freed.
3731  */
3732 GList *
3733 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3734     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3735 {
3736   GstRTSPStreamPrivate *priv;
3737   GList *result, *walk, *next;
3738   GHashTable *visited = NULL;
3739   guint cookie;
3740
3741   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3742
3743   priv = stream->priv;
3744
3745   result = NULL;
3746   if (func)
3747     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3748
3749   g_mutex_lock (&priv->lock);
3750 restart:
3751   cookie = priv->transports_cookie;
3752   for (walk = priv->transports; walk; walk = next) {
3753     GstRTSPStreamTransport *trans = walk->data;
3754     GstRTSPFilterResult res;
3755     gboolean changed;
3756
3757     next = g_list_next (walk);
3758
3759     if (func) {
3760       /* only visit each transport once */
3761       if (g_hash_table_contains (visited, trans))
3762         continue;
3763
3764       g_hash_table_add (visited, g_object_ref (trans));
3765       g_mutex_unlock (&priv->lock);
3766
3767       res = func (stream, trans, user_data);
3768
3769       g_mutex_lock (&priv->lock);
3770     } else
3771       res = GST_RTSP_FILTER_REF;
3772
3773     changed = (cookie != priv->transports_cookie);
3774
3775     switch (res) {
3776       case GST_RTSP_FILTER_REMOVE:
3777         update_transport (stream, trans, FALSE);
3778         break;
3779       case GST_RTSP_FILTER_REF:
3780         result = g_list_prepend (result, g_object_ref (trans));
3781         break;
3782       case GST_RTSP_FILTER_KEEP:
3783       default:
3784         break;
3785     }
3786     if (changed)
3787       goto restart;
3788   }
3789   g_mutex_unlock (&priv->lock);
3790
3791   if (func)
3792     g_hash_table_unref (visited);
3793
3794   return result;
3795 }
3796
3797 static GstPadProbeReturn
3798 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3799 {
3800   GstRTSPStreamPrivate *priv;
3801   GstRTSPStream *stream;
3802
3803   stream = user_data;
3804   priv = stream->priv;
3805
3806   GST_DEBUG_OBJECT (pad, "now blocking");
3807
3808   g_mutex_lock (&priv->lock);
3809   priv->blocking = TRUE;
3810   g_mutex_unlock (&priv->lock);
3811
3812   gst_element_post_message (priv->payloader,
3813       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3814           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3815
3816   return GST_PAD_PROBE_OK;
3817 }
3818
3819 /**
3820  * gst_rtsp_stream_set_blocked:
3821  * @stream: a #GstRTSPStream
3822  * @blocked: boolean indicating we should block or unblock
3823  *
3824  * Blocks or unblocks the dataflow on @stream.
3825  *
3826  * Returns: %TRUE on success
3827  */
3828 gboolean
3829 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3830 {
3831   GstRTSPStreamPrivate *priv;
3832
3833   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3834
3835   priv = stream->priv;
3836
3837   g_mutex_lock (&priv->lock);
3838   if (blocked) {
3839     priv->blocking = FALSE;
3840     if (priv->blocked_id == 0) {
3841       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3842           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3843           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3844           g_object_ref (stream), g_object_unref);
3845     }
3846   } else {
3847     if (priv->blocked_id != 0) {
3848       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3849       priv->blocked_id = 0;
3850       priv->blocking = FALSE;
3851     }
3852   }
3853   g_mutex_unlock (&priv->lock);
3854
3855   return TRUE;
3856 }
3857
3858 /**
3859  * gst_rtsp_stream_is_blocking:
3860  * @stream: a #GstRTSPStream
3861  *
3862  * Check if @stream is blocking on a #GstBuffer.
3863  *
3864  * Returns: %TRUE if @stream is blocking
3865  */
3866 gboolean
3867 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3868 {
3869   GstRTSPStreamPrivate *priv;
3870   gboolean result;
3871
3872   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3873
3874   priv = stream->priv;
3875
3876   g_mutex_lock (&priv->lock);
3877   result = priv->blocking;
3878   g_mutex_unlock (&priv->lock);
3879
3880   return result;
3881 }
3882
3883 /**
3884  * gst_rtsp_stream_query_position:
3885  * @stream: a #GstRTSPStream
3886  *
3887  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3888  * the RTP parts of the pipeline and not the RTCP parts.
3889  *
3890  * Returns: %TRUE if the position could be queried
3891  */
3892 gboolean
3893 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3894 {
3895   GstRTSPStreamPrivate *priv;
3896   GstElement *sink;
3897   gboolean ret;
3898
3899   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3900
3901   priv = stream->priv;
3902
3903   g_mutex_lock (&priv->lock);
3904   /* depending on the transport type, it should query corresponding sink */
3905   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3906       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3907     sink = priv->udpsink[0];
3908   else
3909     sink = priv->appsink[0];
3910
3911   if (sink)
3912     gst_object_ref (sink);
3913   g_mutex_unlock (&priv->lock);
3914
3915   if (!sink)
3916     return FALSE;
3917
3918   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3919   gst_object_unref (sink);
3920
3921   return ret;
3922 }
3923
3924 /**
3925  * gst_rtsp_stream_query_stop:
3926  * @stream: a #GstRTSPStream
3927  *
3928  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3929  * the RTP parts of the pipeline and not the RTCP parts.
3930  *
3931  * Returns: %TRUE if the stop could be queried
3932  */
3933 gboolean
3934 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3935 {
3936   GstRTSPStreamPrivate *priv;
3937   GstElement *sink;
3938   GstQuery *query;
3939   gboolean ret;
3940
3941   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3942
3943   priv = stream->priv;
3944
3945   g_mutex_lock (&priv->lock);
3946   /* depending on the transport type, it should query corresponding sink */
3947   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3948       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3949     sink = priv->udpsink[0];
3950   else
3951     sink = priv->appsink[0];
3952
3953   if (sink)
3954     gst_object_ref (sink);
3955   g_mutex_unlock (&priv->lock);
3956
3957   if (!sink)
3958     return FALSE;
3959
3960   query = gst_query_new_segment (GST_FORMAT_TIME);
3961   if ((ret = gst_element_query (sink, query))) {
3962     GstFormat format;
3963
3964     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3965     if (format != GST_FORMAT_TIME)
3966       *stop = -1;
3967   }
3968   gst_query_unref (query);
3969   gst_object_unref (sink);
3970
3971   return ret;
3972
3973 }