Revert "rtsp-stream: Fix crash on cleanup with shared media and multiple udpsrc"
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   gboolean is_joined;
74   GstBin *joined_bin;
75
76   /* TRUE if this stream is running on
77    * the client side of an RTSP link (for RECORD) */
78   gboolean client_side;
79   gchar *control;
80
81   GstRTSPProfile profiles;
82   GstRTSPLowerTrans protocols;
83
84   /* pads on the rtpbin */
85   GstPad *send_rtp_sink;
86   GstPad *recv_rtp_src;
87   GstPad *recv_sink[2];
88   GstPad *send_src[2];
89
90   /* the RTPSession object */
91   GObject *session;
92
93   /* SRTP encoder/decoder */
94   GstElement *srtpenc;
95   GstElement *srtpdec;
96   GHashTable *keys;
97
98   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
99    * sockets */
100   GstElement *udpsrc_v4[2];
101   /* UDP sources for UDP multicast transports */
102   GstElement *udpsrc_mcast_v4[2];
103
104   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
105    * sockets */
106   GstElement *udpsrc_v6[2];
107   /* UDP sources for UDP multicast transports */
108   GstElement *udpsrc_mcast_v6[2];
109
110   GstElement *udpqueue[2];
111   GstElement *udpsink[2];
112
113   /* for TCP transport */
114   GstElement *appsrc[2];
115   GstClockTime appsrc_base_time[2];
116   GstElement *appqueue[2];
117   GstElement *appsink[2];
118
119   GstElement *tee[2];
120   GstElement *funnel[2];
121
122   /* retransmission */
123   GstElement *rtxsend;
124   guint rtx_pt;
125   GstClockTime rtx_time;
126
127   /* server ports for sending/receiving over ipv4 */
128   GstRTSPRange server_port_v4;
129   GstRTSPAddress *server_addr_v4;
130   gboolean have_ipv4;
131
132   /* server ports for sending/receiving over ipv6 */
133   GstRTSPRange server_port_v6;
134   GstRTSPAddress *server_addr_v6;
135   gboolean have_ipv6;
136
137   /* multicast addresses */
138   GstRTSPAddressPool *pool;
139   GstRTSPAddress *addr_v4;
140   GstRTSPAddress *addr_v6;
141   gboolean have_ipv4_mcast;
142   gboolean have_ipv6_mcast;
143
144   gchar *multicast_iface;
145
146   /* the caps of the stream */
147   gulong caps_sig;
148   GstCaps *caps;
149
150   /* transports we stream to */
151   guint n_active;
152   GList *transports;
153   guint transports_cookie;
154   GList *tr_cache_rtp;
155   GList *tr_cache_rtcp;
156   guint tr_cache_cookie_rtp;
157   guint tr_cache_cookie_rtcp;
158
159
160   gint dscp_qos;
161
162   /* stream blocking */
163   gulong blocked_id;
164   gboolean blocking;
165
166   /* pt->caps map for RECORD streams */
167   GHashTable *ptmap;
168
169   GstRTSPPublishClockMode publish_clock_mode;
170 };
171
172 #define DEFAULT_CONTROL         NULL
173 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
174 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
175                                         GST_RTSP_LOWER_TRANS_TCP
176
177 enum
178 {
179   PROP_0,
180   PROP_CONTROL,
181   PROP_PROFILES,
182   PROP_PROTOCOLS,
183   PROP_LAST
184 };
185
186 enum
187 {
188   SIGNAL_NEW_RTP_ENCODER,
189   SIGNAL_NEW_RTCP_ENCODER,
190   SIGNAL_LAST
191 };
192
193 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
194 #define GST_CAT_DEFAULT rtsp_stream_debug
195
196 static GQuark ssrc_stream_map_key;
197
198 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
199     GValue * value, GParamSpec * pspec);
200 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
201     const GValue * value, GParamSpec * pspec);
202
203 static void gst_rtsp_stream_finalize (GObject * obj);
204
205 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
206
207 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
208
209 static void
210 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
211 {
212   GObjectClass *gobject_class;
213
214   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
215
216   gobject_class = G_OBJECT_CLASS (klass);
217
218   gobject_class->get_property = gst_rtsp_stream_get_property;
219   gobject_class->set_property = gst_rtsp_stream_set_property;
220   gobject_class->finalize = gst_rtsp_stream_finalize;
221
222   g_object_class_install_property (gobject_class, PROP_CONTROL,
223       g_param_spec_string ("control", "Control",
224           "The control string for this stream", DEFAULT_CONTROL,
225           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   g_object_class_install_property (gobject_class, PROP_PROFILES,
228       g_param_spec_flags ("profiles", "Profiles",
229           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
230           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
233       g_param_spec_flags ("protocols", "Protocols",
234           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
235           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
236
237   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
238       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
239       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
240       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
241
242   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
243       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
245       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
246
247   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
248
249   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
250 }
251
252 static void
253 gst_rtsp_stream_init (GstRTSPStream * stream)
254 {
255   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
256
257   GST_DEBUG ("new stream %p", stream);
258
259   stream->priv = priv;
260
261   priv->dscp_qos = -1;
262   priv->control = g_strdup (DEFAULT_CONTROL);
263   priv->profiles = DEFAULT_PROFILES;
264   priv->protocols = DEFAULT_PROTOCOLS;
265   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
266
267   g_mutex_init (&priv->lock);
268
269   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) gst_caps_unref);
271   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
272       (GDestroyNotify) gst_caps_unref);
273 }
274
275 static void
276 gst_rtsp_stream_finalize (GObject * obj)
277 {
278   GstRTSPStream *stream;
279   GstRTSPStreamPrivate *priv;
280
281   stream = GST_RTSP_STREAM (obj);
282   priv = stream->priv;
283
284   GST_DEBUG ("finalize stream %p", stream);
285
286   /* we really need to be unjoined now */
287   g_return_if_fail (!priv->is_joined);
288
289   if (priv->addr_v4)
290     gst_rtsp_address_free (priv->addr_v4);
291   if (priv->addr_v6)
292     gst_rtsp_address_free (priv->addr_v6);
293   if (priv->server_addr_v4)
294     gst_rtsp_address_free (priv->server_addr_v4);
295   if (priv->server_addr_v6)
296     gst_rtsp_address_free (priv->server_addr_v6);
297   if (priv->pool)
298     g_object_unref (priv->pool);
299   if (priv->rtxsend)
300     g_object_unref (priv->rtxsend);
301
302   g_free (priv->multicast_iface);
303
304   gst_object_unref (priv->payloader);
305   if (priv->srcpad)
306     gst_object_unref (priv->srcpad);
307   if (priv->sinkpad)
308     gst_object_unref (priv->sinkpad);
309   g_free (priv->control);
310   g_mutex_clear (&priv->lock);
311
312   g_hash_table_unref (priv->keys);
313   g_hash_table_destroy (priv->ptmap);
314
315   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
316 }
317
318 static void
319 gst_rtsp_stream_get_property (GObject * object, guint propid,
320     GValue * value, GParamSpec * pspec)
321 {
322   GstRTSPStream *stream = GST_RTSP_STREAM (object);
323
324   switch (propid) {
325     case PROP_CONTROL:
326       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
327       break;
328     case PROP_PROFILES:
329       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
330       break;
331     case PROP_PROTOCOLS:
332       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
333       break;
334     default:
335       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
336   }
337 }
338
339 static void
340 gst_rtsp_stream_set_property (GObject * object, guint propid,
341     const GValue * value, GParamSpec * pspec)
342 {
343   GstRTSPStream *stream = GST_RTSP_STREAM (object);
344
345   switch (propid) {
346     case PROP_CONTROL:
347       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
348       break;
349     case PROP_PROFILES:
350       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
351       break;
352     case PROP_PROTOCOLS:
353       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
354       break;
355     default:
356       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
357   }
358 }
359
360 /**
361  * gst_rtsp_stream_new:
362  * @idx: an index
363  * @pad: a #GstPad
364  * @payloader: a #GstElement
365  *
366  * Create a new media stream with index @idx that handles RTP data on
367  * @pad and has a payloader element @payloader if @pad is a source pad
368  * or a depayloader element @payloader if @pad is a sink pad.
369  *
370  * Returns: (transfer full): a new #GstRTSPStream
371  */
372 GstRTSPStream *
373 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
374 {
375   GstRTSPStreamPrivate *priv;
376   GstRTSPStream *stream;
377
378   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
379   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
380
381   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
382   priv = stream->priv;
383   priv->idx = idx;
384   priv->payloader = gst_object_ref (payloader);
385   if (GST_PAD_IS_SRC (pad))
386     priv->srcpad = gst_object_ref (pad);
387   else
388     priv->sinkpad = gst_object_ref (pad);
389
390   return stream;
391 }
392
393 /**
394  * gst_rtsp_stream_get_index:
395  * @stream: a #GstRTSPStream
396  *
397  * Get the stream index.
398  *
399  * Return: the stream index.
400  */
401 guint
402 gst_rtsp_stream_get_index (GstRTSPStream * stream)
403 {
404   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
405
406   return stream->priv->idx;
407 }
408
409 /**
410  * gst_rtsp_stream_get_pt:
411  * @stream: a #GstRTSPStream
412  *
413  * Get the stream payload type.
414  *
415  * Return: the stream payload type.
416  */
417 guint
418 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
419 {
420   GstRTSPStreamPrivate *priv;
421   guint pt;
422
423   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
424
425   priv = stream->priv;
426
427   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
428
429   return pt;
430 }
431
432 /**
433  * gst_rtsp_stream_get_srcpad:
434  * @stream: a #GstRTSPStream
435  *
436  * Get the srcpad associated with @stream.
437  *
438  * Returns: (transfer full): the srcpad. Unref after usage.
439  */
440 GstPad *
441 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
442 {
443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
444
445   if (!stream->priv->srcpad)
446     return NULL;
447
448   return gst_object_ref (stream->priv->srcpad);
449 }
450
451 /**
452  * gst_rtsp_stream_get_sinkpad:
453  * @stream: a #GstRTSPStream
454  *
455  * Get the sinkpad associated with @stream.
456  *
457  * Returns: (transfer full): the sinkpad. Unref after usage.
458  */
459 GstPad *
460 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
461 {
462   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
463
464   if (!stream->priv->sinkpad)
465     return NULL;
466
467   return gst_object_ref (stream->priv->sinkpad);
468 }
469
470 /**
471  * gst_rtsp_stream_get_control:
472  * @stream: a #GstRTSPStream
473  *
474  * Get the control string to identify this stream.
475  *
476  * Returns: (transfer full): the control string. g_free() after usage.
477  */
478 gchar *
479 gst_rtsp_stream_get_control (GstRTSPStream * stream)
480 {
481   GstRTSPStreamPrivate *priv;
482   gchar *result;
483
484   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
485
486   priv = stream->priv;
487
488   g_mutex_lock (&priv->lock);
489   if ((result = g_strdup (priv->control)) == NULL)
490     result = g_strdup_printf ("stream=%u", priv->idx);
491   g_mutex_unlock (&priv->lock);
492
493   return result;
494 }
495
496 /**
497  * gst_rtsp_stream_set_control:
498  * @stream: a #GstRTSPStream
499  * @control: a control string
500  *
501  * Set the control string in @stream.
502  */
503 void
504 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
505 {
506   GstRTSPStreamPrivate *priv;
507
508   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
509
510   priv = stream->priv;
511
512   g_mutex_lock (&priv->lock);
513   g_free (priv->control);
514   priv->control = g_strdup (control);
515   g_mutex_unlock (&priv->lock);
516 }
517
518 /**
519  * gst_rtsp_stream_has_control:
520  * @stream: a #GstRTSPStream
521  * @control: a control string
522  *
523  * Check if @stream has the control string @control.
524  *
525  * Returns: %TRUE is @stream has @control as the control string
526  */
527 gboolean
528 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
529 {
530   GstRTSPStreamPrivate *priv;
531   gboolean res;
532
533   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
534
535   priv = stream->priv;
536
537   g_mutex_lock (&priv->lock);
538   if (priv->control)
539     res = (g_strcmp0 (priv->control, control) == 0);
540   else {
541     guint streamid;
542
543     if (sscanf (control, "stream=%u", &streamid) > 0)
544       res = (streamid == priv->idx);
545     else
546       res = FALSE;
547   }
548   g_mutex_unlock (&priv->lock);
549
550   return res;
551 }
552
553 /**
554  * gst_rtsp_stream_set_mtu:
555  * @stream: a #GstRTSPStream
556  * @mtu: a new MTU
557  *
558  * Configure the mtu in the payloader of @stream to @mtu.
559  */
560 void
561 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
562 {
563   GstRTSPStreamPrivate *priv;
564
565   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
566
567   priv = stream->priv;
568
569   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
570
571   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
572 }
573
574 /**
575  * gst_rtsp_stream_get_mtu:
576  * @stream: a #GstRTSPStream
577  *
578  * Get the configured MTU in the payloader of @stream.
579  *
580  * Returns: the MTU of the payloader.
581  */
582 guint
583 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
584 {
585   GstRTSPStreamPrivate *priv;
586   guint mtu;
587
588   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
589
590   priv = stream->priv;
591
592   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
593
594   return mtu;
595 }
596
597 /* Update the dscp qos property on the udp sinks */
598 static void
599 update_dscp_qos (GstRTSPStream * stream)
600 {
601   GstRTSPStreamPrivate *priv;
602
603   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
604
605   priv = stream->priv;
606
607   if (priv->udpsink[0]) {
608     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
609         NULL);
610   }
611
612   if (priv->udpsink[1]) {
613     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
614         NULL);
615   }
616 }
617
618 /**
619  * gst_rtsp_stream_set_dscp_qos:
620  * @stream: a #GstRTSPStream
621  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
622  *
623  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
624  */
625 void
626 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
627 {
628   GstRTSPStreamPrivate *priv;
629
630   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
631
632   priv = stream->priv;
633
634   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
635
636   if (dscp_qos < -1 || dscp_qos > 63) {
637     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
638     return;
639   }
640
641   priv->dscp_qos = dscp_qos;
642
643   update_dscp_qos (stream);
644 }
645
646 /**
647  * gst_rtsp_stream_get_dscp_qos:
648  * @stream: a #GstRTSPStream
649  *
650  * Get the configured DSCP QoS in of the outgoing sockets.
651  *
652  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
653  */
654 gint
655 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
656 {
657   GstRTSPStreamPrivate *priv;
658
659   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
660
661   priv = stream->priv;
662
663   return priv->dscp_qos;
664 }
665
666 /**
667  * gst_rtsp_stream_is_transport_supported:
668  * @stream: a #GstRTSPStream
669  * @transport: (transfer none): a #GstRTSPTransport
670  *
671  * Check if @transport can be handled by stream
672  *
673  * Returns: %TRUE if @transport can be handled by @stream.
674  */
675 gboolean
676 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
677     GstRTSPTransport * transport)
678 {
679   GstRTSPStreamPrivate *priv;
680
681   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
682
683   priv = stream->priv;
684
685   g_mutex_lock (&priv->lock);
686   if (transport->trans != GST_RTSP_TRANS_RTP)
687     goto unsupported_transmode;
688
689   if (!(transport->profile & priv->profiles))
690     goto unsupported_profile;
691
692   if (!(transport->lower_transport & priv->protocols))
693     goto unsupported_ltrans;
694
695   g_mutex_unlock (&priv->lock);
696
697   return TRUE;
698
699   /* ERRORS */
700 unsupported_transmode:
701   {
702     GST_DEBUG ("unsupported transport mode %d", transport->trans);
703     g_mutex_unlock (&priv->lock);
704     return FALSE;
705   }
706 unsupported_profile:
707   {
708     GST_DEBUG ("unsupported profile %d", transport->profile);
709     g_mutex_unlock (&priv->lock);
710     return FALSE;
711   }
712 unsupported_ltrans:
713   {
714     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
715     g_mutex_unlock (&priv->lock);
716     return FALSE;
717   }
718 }
719
720 /**
721  * gst_rtsp_stream_set_profiles:
722  * @stream: a #GstRTSPStream
723  * @profiles: the new profiles
724  *
725  * Configure the allowed profiles for @stream.
726  */
727 void
728 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
729 {
730   GstRTSPStreamPrivate *priv;
731
732   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
733
734   priv = stream->priv;
735
736   g_mutex_lock (&priv->lock);
737   priv->profiles = profiles;
738   g_mutex_unlock (&priv->lock);
739 }
740
741 /**
742  * gst_rtsp_stream_get_profiles:
743  * @stream: a #GstRTSPStream
744  *
745  * Get the allowed profiles of @stream.
746  *
747  * Returns: a #GstRTSPProfile
748  */
749 GstRTSPProfile
750 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
751 {
752   GstRTSPStreamPrivate *priv;
753   GstRTSPProfile res;
754
755   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
756
757   priv = stream->priv;
758
759   g_mutex_lock (&priv->lock);
760   res = priv->profiles;
761   g_mutex_unlock (&priv->lock);
762
763   return res;
764 }
765
766 /**
767  * gst_rtsp_stream_set_protocols:
768  * @stream: a #GstRTSPStream
769  * @protocols: the new flags
770  *
771  * Configure the allowed lower transport for @stream.
772  */
773 void
774 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
775     GstRTSPLowerTrans protocols)
776 {
777   GstRTSPStreamPrivate *priv;
778
779   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
780
781   priv = stream->priv;
782
783   g_mutex_lock (&priv->lock);
784   priv->protocols = protocols;
785   g_mutex_unlock (&priv->lock);
786 }
787
788 /**
789  * gst_rtsp_stream_get_protocols:
790  * @stream: a #GstRTSPStream
791  *
792  * Get the allowed protocols of @stream.
793  *
794  * Returns: a #GstRTSPLowerTrans
795  */
796 GstRTSPLowerTrans
797 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
798 {
799   GstRTSPStreamPrivate *priv;
800   GstRTSPLowerTrans res;
801
802   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
803       GST_RTSP_LOWER_TRANS_UNKNOWN);
804
805   priv = stream->priv;
806
807   g_mutex_lock (&priv->lock);
808   res = priv->protocols;
809   g_mutex_unlock (&priv->lock);
810
811   return res;
812 }
813
814 /**
815  * gst_rtsp_stream_set_address_pool:
816  * @stream: a #GstRTSPStream
817  * @pool: (transfer none): a #GstRTSPAddressPool
818  *
819  * configure @pool to be used as the address pool of @stream.
820  */
821 void
822 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
823     GstRTSPAddressPool * pool)
824 {
825   GstRTSPStreamPrivate *priv;
826   GstRTSPAddressPool *old;
827
828   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
829
830   priv = stream->priv;
831
832   GST_LOG_OBJECT (stream, "set address pool %p", pool);
833
834   g_mutex_lock (&priv->lock);
835   if ((old = priv->pool) != pool)
836     priv->pool = pool ? g_object_ref (pool) : NULL;
837   else
838     old = NULL;
839   g_mutex_unlock (&priv->lock);
840
841   if (old)
842     g_object_unref (old);
843 }
844
845 /**
846  * gst_rtsp_stream_get_address_pool:
847  * @stream: a #GstRTSPStream
848  *
849  * Get the #GstRTSPAddressPool used as the address pool of @stream.
850  *
851  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
852  * usage.
853  */
854 GstRTSPAddressPool *
855 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
856 {
857   GstRTSPStreamPrivate *priv;
858   GstRTSPAddressPool *result;
859
860   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
861
862   priv = stream->priv;
863
864   g_mutex_lock (&priv->lock);
865   if ((result = priv->pool))
866     g_object_ref (result);
867   g_mutex_unlock (&priv->lock);
868
869   return result;
870 }
871
872 /**
873  * gst_rtsp_stream_set_multicast_iface:
874  * @stream: a #GstRTSPStream
875  * @multicast_iface: (transfer none): a multicast interface
876  *
877  * configure @multicast_iface to be used for @stream.
878  */
879 void
880 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
881     const gchar * multicast_iface)
882 {
883   GstRTSPStreamPrivate *priv;
884   gchar *old;
885
886   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
887
888   priv = stream->priv;
889
890   GST_LOG_OBJECT (stream, "set multicast iface %s",
891       GST_STR_NULL (multicast_iface));
892
893   g_mutex_lock (&priv->lock);
894   if ((old = priv->multicast_iface) != multicast_iface)
895     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
896   else
897     old = NULL;
898   g_mutex_unlock (&priv->lock);
899
900   if (old)
901     g_free (old);
902 }
903
904 /**
905  * gst_rtsp_stream_get_multicast_iface:
906  * @stream: a #GstRTSPStream
907  *
908  * Get the multicast interface used for @stream.
909  *
910  * Returns: (transfer full): the multicast interface for @stream. g_free() after
911  * usage.
912  */
913 gchar *
914 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
915 {
916   GstRTSPStreamPrivate *priv;
917   gchar *result;
918
919   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
920
921   priv = stream->priv;
922
923   g_mutex_lock (&priv->lock);
924   if ((result = priv->multicast_iface))
925     result = g_strdup (result);
926   g_mutex_unlock (&priv->lock);
927
928   return result;
929 }
930
931 /**
932  * gst_rtsp_stream_get_multicast_address:
933  * @stream: a #GstRTSPStream
934  * @family: the #GSocketFamily
935  *
936  * Get the multicast address of @stream for @family.
937  *
938  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
939  * or %NULL when no address could be allocated. gst_rtsp_address_free()
940  * after usage.
941  */
942 GstRTSPAddress *
943 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
944     GSocketFamily family)
945 {
946   GstRTSPStreamPrivate *priv;
947   GstRTSPAddress *result;
948   GstRTSPAddress **addrp;
949   GstRTSPAddressFlags flags;
950
951   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
952
953   priv = stream->priv;
954
955   if (family == G_SOCKET_FAMILY_IPV6) {
956     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
957     addrp = &priv->addr_v6;
958   } else {
959     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
960     addrp = &priv->addr_v4;
961   }
962
963   g_mutex_lock (&priv->lock);
964   if (*addrp == NULL) {
965     if (priv->pool == NULL)
966       goto no_pool;
967
968     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
969
970     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
971     if (*addrp == NULL)
972       goto no_address;
973   }
974   result = gst_rtsp_address_copy (*addrp);
975   g_mutex_unlock (&priv->lock);
976
977   return result;
978
979   /* ERRORS */
980 no_pool:
981   {
982     GST_ERROR_OBJECT (stream, "no address pool specified");
983     g_mutex_unlock (&priv->lock);
984     return NULL;
985   }
986 no_address:
987   {
988     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
989     g_mutex_unlock (&priv->lock);
990     return NULL;
991   }
992 }
993
994 /**
995  * gst_rtsp_stream_reserve_address:
996  * @stream: a #GstRTSPStream
997  * @address: an address
998  * @port: a port
999  * @n_ports: n_ports
1000  * @ttl: a TTL
1001  *
1002  * Reserve @address and @port as the address and port of @stream.
1003  *
1004  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1005  * the address could be reserved. gst_rtsp_address_free() after usage.
1006  */
1007 GstRTSPAddress *
1008 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1009     const gchar * address, guint port, guint n_ports, guint ttl)
1010 {
1011   GstRTSPStreamPrivate *priv;
1012   GstRTSPAddress *result;
1013   GInetAddress *addr;
1014   GSocketFamily family;
1015   GstRTSPAddress **addrp;
1016
1017   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1018   g_return_val_if_fail (address != NULL, NULL);
1019   g_return_val_if_fail (port > 0, NULL);
1020   g_return_val_if_fail (n_ports > 0, NULL);
1021   g_return_val_if_fail (ttl > 0, NULL);
1022
1023   priv = stream->priv;
1024
1025   addr = g_inet_address_new_from_string (address);
1026   if (!addr) {
1027     GST_ERROR ("failed to get inet addr from %s", address);
1028     family = G_SOCKET_FAMILY_IPV4;
1029   } else {
1030     family = g_inet_address_get_family (addr);
1031     g_object_unref (addr);
1032   }
1033
1034   if (family == G_SOCKET_FAMILY_IPV6)
1035     addrp = &priv->addr_v6;
1036   else
1037     addrp = &priv->addr_v4;
1038
1039   g_mutex_lock (&priv->lock);
1040   if (*addrp == NULL) {
1041     GstRTSPAddressPoolResult res;
1042
1043     if (priv->pool == NULL)
1044       goto no_pool;
1045
1046     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1047         port, n_ports, ttl, addrp);
1048     if (res != GST_RTSP_ADDRESS_POOL_OK)
1049       goto no_address;
1050   } else {
1051     if (strcmp ((*addrp)->address, address) ||
1052         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1053         (*addrp)->ttl != ttl)
1054       goto different_address;
1055   }
1056   result = gst_rtsp_address_copy (*addrp);
1057   g_mutex_unlock (&priv->lock);
1058
1059   return result;
1060
1061   /* ERRORS */
1062 no_pool:
1063   {
1064     GST_ERROR_OBJECT (stream, "no address pool specified");
1065     g_mutex_unlock (&priv->lock);
1066     return NULL;
1067   }
1068 no_address:
1069   {
1070     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1071         address);
1072     g_mutex_unlock (&priv->lock);
1073     return NULL;
1074   }
1075 different_address:
1076   {
1077     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1078         " reserved", address);
1079     g_mutex_unlock (&priv->lock);
1080     return NULL;
1081   }
1082 }
1083
1084 /* must be called with lock */
1085 static void
1086 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1087     GSocket * rtcp_socket, GSocketFamily family)
1088 {
1089   GstRTSPStreamPrivate *priv = stream->priv;
1090   const gchar *multisink_socket;
1091
1092   if (family == G_SOCKET_FAMILY_IPV6)
1093     multisink_socket = "socket-v6";
1094   else
1095     multisink_socket = "socket";
1096
1097   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1098       NULL);
1099   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1100       NULL);
1101 }
1102
1103 /* must be called with lock */
1104 static gboolean
1105 create_and_configure_udpsinks (GstRTSPStream * stream)
1106 {
1107   GstRTSPStreamPrivate *priv = stream->priv;
1108   GstElement *udpsink0, *udpsink1;
1109
1110   udpsink0 = NULL;
1111   udpsink1 = NULL;
1112
1113   if (priv->udpsink[0])
1114     udpsink0 = priv->udpsink[0];
1115   else
1116     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1117
1118   if (!udpsink0)
1119     goto no_udp_protocol;
1120
1121   if (priv->udpsink[1])
1122     udpsink1 = priv->udpsink[1];
1123   else
1124     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1125
1126   if (!udpsink1)
1127     goto no_udp_protocol;
1128
1129   /* configure sinks */
1130
1131   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1132   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1133
1134   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1135   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1136
1137   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1138
1139   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1140   /* Needs to be async for RECORD streams, otherwise we will never go to
1141    * PLAYING because the sinks will wait for data while the udpsrc can't
1142    * provide data with timestamps in PAUSED. */
1143   if (priv->sinkpad)
1144     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1145   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1146
1147   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1148   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1149
1150   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1151   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1152
1153   /* update the dscp qos field in the sinks */
1154   update_dscp_qos (stream);
1155
1156   priv->udpsink[0] = udpsink0;
1157   priv->udpsink[1] = udpsink1;
1158
1159   return TRUE;
1160
1161   /* ERRORS */
1162 no_udp_protocol:
1163   {
1164     return FALSE;
1165   }
1166 }
1167
1168 /* must be called with lock */
1169 static void
1170 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1171     GSocketFamily family)
1172 {
1173   GstRTSPStreamPrivate *priv;
1174   GstPad *pad, *selpad;
1175   guint i;
1176   GstBin *bin;
1177
1178   priv = stream->priv;
1179   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1180
1181   for (i = 0; i < 2; i++) {
1182     if (priv->sinkpad || i == 1) {
1183       if (priv->srcpad) {
1184         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1185          * values. This is only relevant for PLAY pipelines */
1186         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1187         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1188       }
1189       /* add udpsrc */
1190       gst_bin_add (bin, udpsrc_out[i]);
1191
1192       /* and link to the funnel */
1193       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1194       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1195       gst_pad_link (pad, selpad);
1196       gst_object_unref (pad);
1197       gst_object_unref (selpad);
1198
1199       /* otherwise sync state with parent in case it's running already
1200        * at this point */
1201       if (!priv->srcpad) {
1202         gst_element_sync_state_with_parent (udpsrc_out[i]);
1203       }
1204     }
1205   }
1206
1207   gst_object_unref (bin);
1208 }
1209
1210 /* must be called with lock */
1211 static gboolean
1212 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1213     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1214     const gchar * address, gint rtpport, gint rtcpport,
1215     const gchar * multicast_iface, GstRTSPLowerTrans transport)
1216 {
1217   GstStateChangeReturn ret;
1218
1219   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1220   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1221
1222   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1223     goto error;
1224
1225   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1226     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1227     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1228     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1229     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1230     g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface,
1231         NULL);
1232     g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface,
1233         NULL);
1234     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1235     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1236   }
1237
1238   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1239   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1240
1241   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1242   if (ret == GST_STATE_CHANGE_FAILURE)
1243     goto error;
1244   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1245   if (ret == GST_STATE_CHANGE_FAILURE)
1246     goto error;
1247
1248   return TRUE;
1249   return TRUE;
1250
1251   /* ERRORS */
1252 error:
1253   {
1254     if (udpsrc_out[0])
1255       gst_object_unref (udpsrc_out[0]);
1256     if (udpsrc_out[1])
1257       gst_object_unref (udpsrc_out[1]);
1258     return FALSE;
1259   }
1260 }
1261
1262 static gboolean
1263 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1264     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1265     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1266     gboolean use_client_settings)
1267 {
1268   GstRTSPStreamPrivate *priv = stream->priv;
1269   GSocket *rtp_socket = NULL;
1270   GSocket *rtcp_socket;
1271   gint tmp_rtp, tmp_rtcp;
1272   guint count;
1273   gint rtpport, rtcpport;
1274   GList *rejected_addresses = NULL;
1275   GstRTSPAddress *addr = NULL;
1276   GInetAddress *inetaddr = NULL;
1277   gchar *addr_str;
1278   GSocketAddress *rtp_sockaddr = NULL;
1279   GSocketAddress *rtcp_sockaddr = NULL;
1280   GstRTSPAddressPool *pool;
1281   GstRTSPLowerTrans transport;
1282   const gchar *multicast_iface = priv->multicast_iface;
1283
1284   pool = priv->pool;
1285   count = 0;
1286   transport = ct->lower_transport;
1287
1288   /* Start with random port */
1289   tmp_rtp = 0;
1290
1291   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1292       G_SOCKET_PROTOCOL_UDP, NULL);
1293   if (!rtcp_socket)
1294     goto no_udp_protocol;
1295   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1296
1297   if (*server_addr_out)
1298     gst_rtsp_address_free (*server_addr_out);
1299
1300   /* try to allocate 2 UDP ports, the RTP port should be an even
1301    * number and the RTCP port should be the next (uneven) port */
1302 again:
1303
1304   if (rtp_socket == NULL) {
1305     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1306         G_SOCKET_PROTOCOL_UDP, NULL);
1307     if (!rtp_socket)
1308       goto no_udp_protocol;
1309     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1310   }
1311
1312   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1313               gst_rtsp_address_pool_has_unicast_addresses (pool))
1314           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1315     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1316
1317     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1318       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1319     else
1320       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1321
1322     if (addr)
1323       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1324
1325     if (family == G_SOCKET_FAMILY_IPV6)
1326       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1327     else
1328       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1329
1330     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1331         && use_client_settings)
1332       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1333           ct->port.min, 2, ct->ttl, &addr);
1334     else
1335       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1336
1337     if (addr == NULL)
1338       goto no_ports;
1339
1340     tmp_rtp = addr->port;
1341
1342     g_clear_object (&inetaddr);
1343     inetaddr = g_inet_address_new_from_string (addr->address);
1344
1345     /* If we're supposed to bind to a multicast address, instead bind
1346      * to ANY and let udpsrc later join the relevant multicast group
1347      */
1348     if (g_inet_address_get_is_multicast (inetaddr)) {
1349       g_object_unref (inetaddr);
1350       inetaddr = g_inet_address_new_any (family);
1351     }
1352   } else {
1353     if (tmp_rtp != 0) {
1354       tmp_rtp += 2;
1355       if (++count > 20)
1356         goto no_ports;
1357     }
1358
1359     if (inetaddr == NULL)
1360       inetaddr = g_inet_address_new_any (family);
1361   }
1362
1363   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1364   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1365     g_object_unref (rtp_sockaddr);
1366     goto again;
1367   }
1368   g_object_unref (rtp_sockaddr);
1369
1370   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1371   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1372     g_clear_object (&rtp_sockaddr);
1373     goto socket_error;
1374   }
1375
1376   tmp_rtp =
1377       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1378   g_object_unref (rtp_sockaddr);
1379
1380   /* check if port is even */
1381   if ((tmp_rtp & 1) != 0) {
1382     /* port not even, close and allocate another */
1383     tmp_rtp++;
1384     g_clear_object (&rtp_socket);
1385     goto again;
1386   }
1387
1388   /* set port */
1389   tmp_rtcp = tmp_rtp + 1;
1390
1391   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1392   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1393     g_object_unref (rtcp_sockaddr);
1394     g_clear_object (&rtp_socket);
1395     goto again;
1396   }
1397   g_object_unref (rtcp_sockaddr);
1398
1399   if (addr == NULL)
1400     addr_str = g_inet_address_to_string (inetaddr);
1401   else
1402     addr_str = addr->address;
1403   g_clear_object (&inetaddr);
1404
1405   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1406           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface,
1407           transport)) {
1408     if (addr == NULL)
1409       g_free (addr_str);
1410     goto no_udp_protocol;
1411   }
1412
1413   if (addr == NULL)
1414     g_free (addr_str);
1415
1416   play_udpsources_one_family (stream, udpsrc_out, family);
1417
1418   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1419   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1420
1421   /* this should not happen... */
1422   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1423     goto port_error;
1424
1425   /* set RTP and RTCP sockets */
1426   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1427
1428   server_port_out->min = rtpport;
1429   server_port_out->max = rtcpport;
1430
1431   *server_addr_out = addr;
1432   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1433
1434   g_object_unref (rtp_socket);
1435   g_object_unref (rtcp_socket);
1436
1437   return TRUE;
1438
1439   /* ERRORS */
1440 no_udp_protocol:
1441   {
1442     goto cleanup;
1443   }
1444 no_ports:
1445   {
1446     goto cleanup;
1447   }
1448 port_error:
1449   {
1450     goto cleanup;
1451   }
1452 socket_error:
1453   {
1454     goto cleanup;
1455   }
1456 cleanup:
1457   {
1458     if (inetaddr)
1459       g_object_unref (inetaddr);
1460     g_list_free_full (rejected_addresses,
1461         (GDestroyNotify) gst_rtsp_address_free);
1462     if (addr)
1463       gst_rtsp_address_free (addr);
1464     if (rtp_socket)
1465       g_object_unref (rtp_socket);
1466     if (rtcp_socket)
1467       g_object_unref (rtcp_socket);
1468     return FALSE;
1469   }
1470 }
1471
1472 /**
1473  * gst_rtsp_stream_allocate_udp_sockets:
1474  * @stream: a #GstRTSPStream
1475  * @family: protocol family
1476  * @transport_method: transport method
1477  *
1478  * Allocates RTP and RTCP ports.
1479  *
1480  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1481  */
1482 gboolean
1483 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1484     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1485 {
1486   GstRTSPStreamPrivate *priv;
1487   gboolean result = FALSE;
1488   GstRTSPLowerTrans transport = ct->lower_transport;
1489
1490   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1491   priv = stream->priv;
1492   g_return_val_if_fail (priv->is_joined, FALSE);
1493
1494   g_mutex_lock (&priv->lock);
1495
1496   if (family == G_SOCKET_FAMILY_IPV4) {
1497     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1498       if (priv->have_ipv4_mcast)
1499         goto done;
1500       priv->have_ipv4_mcast =
1501           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1502           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
1503           use_client_settings);
1504     } else {
1505       priv->have_ipv4 =
1506           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1507           &priv->server_port_v4, ct, &priv->server_addr_v4,
1508           use_client_settings);
1509     }
1510   } else {
1511     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1512       if (priv->have_ipv6_mcast)
1513         goto done;
1514       priv->have_ipv6_mcast =
1515           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1516           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
1517           use_client_settings);
1518     } else {
1519       if (priv->have_ipv6)
1520         goto done;
1521       priv->have_ipv6 =
1522           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1523           &priv->server_port_v6, ct, &priv->server_addr_v6,
1524           use_client_settings);
1525     }
1526   }
1527
1528 done:
1529   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1530       priv->have_ipv6_mcast;
1531
1532   g_mutex_unlock (&priv->lock);
1533
1534   return result;
1535 }
1536
1537 /**
1538  * gst_rtsp_stream_set_client_side:
1539  * @stream: a #GstRTSPStream
1540  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1541  * an RTSP connection.
1542  *
1543  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1544  * streams to an RTSP server via RECORD. This has the practical effect
1545  * of changing which UDP port numbers are used when setting up the local
1546  * side of the stream sending to be either the 'server' or 'client' pair
1547  * of a configured UDP transport.
1548  */
1549 void
1550 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1551 {
1552   GstRTSPStreamPrivate *priv;
1553
1554   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1555   priv = stream->priv;
1556   g_mutex_lock (&priv->lock);
1557   priv->client_side = client_side;
1558   g_mutex_unlock (&priv->lock);
1559 }
1560
1561 /**
1562  * gst_rtsp_stream_is_client_side:
1563  * @stream: a #GstRTSPStream
1564  *
1565  * See gst_rtsp_stream_set_client_side()
1566  *
1567  * Returns: TRUE if this #GstRTSPStream is client-side.
1568  */
1569 gboolean
1570 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1571 {
1572   GstRTSPStreamPrivate *priv;
1573   gboolean ret;
1574
1575   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1576
1577   priv = stream->priv;
1578   g_mutex_lock (&priv->lock);
1579   ret = priv->client_side;
1580   g_mutex_unlock (&priv->lock);
1581
1582   return ret;
1583 }
1584
1585 /**
1586  * gst_rtsp_stream_get_server_port:
1587  * @stream: a #GstRTSPStream
1588  * @server_port: (out): result server port
1589  * @family: the port family to get
1590  *
1591  * Fill @server_port with the port pair used by the server. This function can
1592  * only be called when @stream has been joined.
1593  */
1594 void
1595 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1596     GstRTSPRange * server_port, GSocketFamily family)
1597 {
1598   GstRTSPStreamPrivate *priv;
1599
1600   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1601   priv = stream->priv;
1602   g_return_if_fail (priv->is_joined);
1603
1604   g_mutex_lock (&priv->lock);
1605   if (family == G_SOCKET_FAMILY_IPV4) {
1606     if (server_port)
1607       *server_port = priv->server_port_v4;
1608   } else {
1609     if (server_port)
1610       *server_port = priv->server_port_v6;
1611   }
1612   g_mutex_unlock (&priv->lock);
1613 }
1614
1615 /**
1616  * gst_rtsp_stream_get_rtpsession:
1617  * @stream: a #GstRTSPStream
1618  *
1619  * Get the RTP session of this stream.
1620  *
1621  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1622  */
1623 GObject *
1624 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1625 {
1626   GstRTSPStreamPrivate *priv;
1627   GObject *session;
1628
1629   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1630
1631   priv = stream->priv;
1632
1633   g_mutex_lock (&priv->lock);
1634   if ((session = priv->session))
1635     g_object_ref (session);
1636   g_mutex_unlock (&priv->lock);
1637
1638   return session;
1639 }
1640
1641 /**
1642  * gst_rtsp_stream_get_encoder:
1643  * @stream: a #GstRTSPStream
1644  *
1645  * Get the SRTP encoder for this stream.
1646  *
1647  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1648  */
1649 GstElement *
1650 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1651 {
1652   GstRTSPStreamPrivate *priv;
1653   GstElement *encoder;
1654
1655   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1656
1657   priv = stream->priv;
1658
1659   g_mutex_lock (&priv->lock);
1660   if ((encoder = priv->srtpenc))
1661     g_object_ref (encoder);
1662   g_mutex_unlock (&priv->lock);
1663
1664   return encoder;
1665 }
1666
1667 /**
1668  * gst_rtsp_stream_get_ssrc:
1669  * @stream: a #GstRTSPStream
1670  * @ssrc: (out): result ssrc
1671  *
1672  * Get the SSRC used by the RTP session of this stream. This function can only
1673  * be called when @stream has been joined.
1674  */
1675 void
1676 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1677 {
1678   GstRTSPStreamPrivate *priv;
1679
1680   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1681   priv = stream->priv;
1682   g_return_if_fail (priv->is_joined);
1683
1684   g_mutex_lock (&priv->lock);
1685   if (ssrc && priv->session)
1686     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1687   g_mutex_unlock (&priv->lock);
1688 }
1689
1690 /**
1691  * gst_rtsp_stream_set_retransmission_time:
1692  * @stream: a #GstRTSPStream
1693  * @time: a #GstClockTime
1694  *
1695  * Set the amount of time to store retransmission packets.
1696  */
1697 void
1698 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1699     GstClockTime time)
1700 {
1701   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1702
1703   g_mutex_lock (&stream->priv->lock);
1704   stream->priv->rtx_time = time;
1705   if (stream->priv->rtxsend)
1706     g_object_set (stream->priv->rtxsend, "max-size-time",
1707         GST_TIME_AS_MSECONDS (time), NULL);
1708   g_mutex_unlock (&stream->priv->lock);
1709 }
1710
1711 /**
1712  * gst_rtsp_stream_get_retransmission_time:
1713  * @stream: a #GstRTSPStream
1714  *
1715  * Get the amount of time to store retransmission data.
1716  *
1717  * Returns: the amount of time to store retransmission data.
1718  */
1719 GstClockTime
1720 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1721 {
1722   GstClockTime ret;
1723
1724   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1725
1726   g_mutex_lock (&stream->priv->lock);
1727   ret = stream->priv->rtx_time;
1728   g_mutex_unlock (&stream->priv->lock);
1729
1730   return ret;
1731 }
1732
1733 /**
1734  * gst_rtsp_stream_set_retransmission_pt:
1735  * @stream: a #GstRTSPStream
1736  * @rtx_pt: a #guint
1737  *
1738  * Set the payload type (pt) for retransmission of this stream.
1739  */
1740 void
1741 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1742 {
1743   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1744
1745   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1746
1747   g_mutex_lock (&stream->priv->lock);
1748   stream->priv->rtx_pt = rtx_pt;
1749   if (stream->priv->rtxsend) {
1750     guint pt = gst_rtsp_stream_get_pt (stream);
1751     gchar *pt_s = g_strdup_printf ("%d", pt);
1752     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1753         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1754     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1755     g_free (pt_s);
1756     gst_structure_free (rtx_pt_map);
1757   }
1758   g_mutex_unlock (&stream->priv->lock);
1759 }
1760
1761 /**
1762  * gst_rtsp_stream_get_retransmission_pt:
1763  * @stream: a #GstRTSPStream
1764  *
1765  * Get the payload-type used for retransmission of this stream
1766  *
1767  * Returns: The retransmission PT.
1768  */
1769 guint
1770 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1771 {
1772   guint rtx_pt;
1773
1774   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1775
1776   g_mutex_lock (&stream->priv->lock);
1777   rtx_pt = stream->priv->rtx_pt;
1778   g_mutex_unlock (&stream->priv->lock);
1779
1780   return rtx_pt;
1781 }
1782
1783 /**
1784  * gst_rtsp_stream_set_buffer_size:
1785  * @stream: a #GstRTSPStream
1786  * @size: the buffer size
1787  *
1788  * Set the size of the UDP transmission buffer (in bytes)
1789  * Needs to be set before the stream is joined to a bin.
1790  *
1791  * Since: 1.6
1792  */
1793 void
1794 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1795 {
1796   g_mutex_lock (&stream->priv->lock);
1797   stream->priv->buffer_size = size;
1798   g_mutex_unlock (&stream->priv->lock);
1799 }
1800
1801 /**
1802  * gst_rtsp_stream_get_buffer_size:
1803  * @stream: a #GstRTSPStream
1804  *
1805  * Get the size of the UDP transmission buffer (in bytes)
1806  *
1807  * Returns: the size of the UDP TX buffer
1808  *
1809  * Since: 1.6
1810  */
1811 guint
1812 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1813 {
1814   guint buffer_size;
1815
1816   g_mutex_lock (&stream->priv->lock);
1817   buffer_size = stream->priv->buffer_size;
1818   g_mutex_unlock (&stream->priv->lock);
1819
1820   return buffer_size;
1821 }
1822
1823 /* executed from streaming thread */
1824 static void
1825 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1826 {
1827   GstRTSPStreamPrivate *priv = stream->priv;
1828   GstCaps *newcaps, *oldcaps;
1829
1830   newcaps = gst_pad_get_current_caps (pad);
1831
1832   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1833       newcaps);
1834
1835   g_mutex_lock (&priv->lock);
1836   oldcaps = priv->caps;
1837   priv->caps = newcaps;
1838   g_mutex_unlock (&priv->lock);
1839
1840   if (oldcaps)
1841     gst_caps_unref (oldcaps);
1842 }
1843
1844 static void
1845 dump_structure (const GstStructure * s)
1846 {
1847   gchar *sstr;
1848
1849   sstr = gst_structure_to_string (s);
1850   GST_INFO ("structure: %s", sstr);
1851   g_free (sstr);
1852 }
1853
1854 static GstRTSPStreamTransport *
1855 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1856 {
1857   GstRTSPStreamPrivate *priv = stream->priv;
1858   GList *walk;
1859   GstRTSPStreamTransport *result = NULL;
1860   const gchar *tmp;
1861   gchar *dest;
1862   guint port;
1863
1864   if (rtcp_from == NULL)
1865     return NULL;
1866
1867   tmp = g_strrstr (rtcp_from, ":");
1868   if (tmp == NULL)
1869     return NULL;
1870
1871   port = atoi (tmp + 1);
1872   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1873
1874   g_mutex_lock (&priv->lock);
1875   GST_INFO ("finding %s:%d in %d transports", dest, port,
1876       g_list_length (priv->transports));
1877
1878   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1879     GstRTSPStreamTransport *trans = walk->data;
1880     const GstRTSPTransport *tr;
1881     gint min, max;
1882
1883     tr = gst_rtsp_stream_transport_get_transport (trans);
1884
1885     if (priv->client_side) {
1886       /* In client side mode the 'destination' is the RTSP server, so send
1887        * to those ports */
1888       min = tr->server_port.min;
1889       max = tr->server_port.max;
1890     } else {
1891       min = tr->client_port.min;
1892       max = tr->client_port.max;
1893     }
1894
1895     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1896       result = trans;
1897       break;
1898     }
1899   }
1900   if (result)
1901     g_object_ref (result);
1902   g_mutex_unlock (&priv->lock);
1903
1904   g_free (dest);
1905
1906   return result;
1907 }
1908
1909 static GstRTSPStreamTransport *
1910 check_transport (GObject * source, GstRTSPStream * stream)
1911 {
1912   GstStructure *stats;
1913   GstRTSPStreamTransport *trans;
1914
1915   /* see if we have a stream to match with the origin of the RTCP packet */
1916   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1917   if (trans == NULL) {
1918     g_object_get (source, "stats", &stats, NULL);
1919     if (stats) {
1920       const gchar *rtcp_from;
1921
1922       dump_structure (stats);
1923
1924       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1925       if ((trans = find_transport (stream, rtcp_from))) {
1926         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1927             source);
1928         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1929             g_object_unref);
1930       }
1931       gst_structure_free (stats);
1932     }
1933   }
1934   return trans;
1935 }
1936
1937
1938 static void
1939 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1940 {
1941   GstRTSPStreamTransport *trans;
1942
1943   GST_INFO ("%p: new source %p", stream, source);
1944
1945   trans = check_transport (source, stream);
1946
1947   if (trans)
1948     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1949 }
1950
1951 static void
1952 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1953 {
1954   GST_INFO ("%p: new SDES %p", stream, source);
1955 }
1956
1957 static void
1958 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1959 {
1960   GstRTSPStreamTransport *trans;
1961
1962   trans = check_transport (source, stream);
1963
1964   if (trans) {
1965     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1966     gst_rtsp_stream_transport_keep_alive (trans);
1967   }
1968 #ifdef DUMP_STATS
1969   {
1970     GstStructure *stats;
1971     g_object_get (source, "stats", &stats, NULL);
1972     if (stats) {
1973       dump_structure (stats);
1974       gst_structure_free (stats);
1975     }
1976   }
1977 #endif
1978 }
1979
1980 static void
1981 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1982 {
1983   GST_INFO ("%p: source %p bye", stream, source);
1984 }
1985
1986 static void
1987 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1988 {
1989   GstRTSPStreamTransport *trans;
1990
1991   GST_INFO ("%p: source %p bye timeout", stream, source);
1992
1993   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1994     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1995     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1996   }
1997 }
1998
1999 static void
2000 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2001 {
2002   GstRTSPStreamTransport *trans;
2003
2004   GST_INFO ("%p: source %p timeout", stream, source);
2005
2006   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2007     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2008     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2009   }
2010 }
2011
2012 static void
2013 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2014 {
2015   GST_INFO ("%p: new sender source %p", stream, source);
2016 #ifndef DUMP_STATS
2017   {
2018     GstStructure *stats;
2019     g_object_get (source, "stats", &stats, NULL);
2020     if (stats) {
2021       dump_structure (stats);
2022       gst_structure_free (stats);
2023     }
2024   }
2025 #endif
2026 }
2027
2028 static void
2029 on_sender_ssrc_active (GObject * session, GObject * source,
2030     GstRTSPStream * stream)
2031 {
2032 #ifndef DUMP_STATS
2033   {
2034     GstStructure *stats;
2035     g_object_get (source, "stats", &stats, NULL);
2036     if (stats) {
2037       dump_structure (stats);
2038       gst_structure_free (stats);
2039     }
2040   }
2041 #endif
2042 }
2043
2044 static void
2045 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2046 {
2047   if (is_rtp) {
2048     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2049     g_list_free (priv->tr_cache_rtp);
2050     priv->tr_cache_rtp = NULL;
2051   } else {
2052     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2053     g_list_free (priv->tr_cache_rtcp);
2054     priv->tr_cache_rtcp = NULL;
2055   }
2056 }
2057
2058 static GstFlowReturn
2059 handle_new_sample (GstAppSink * sink, gpointer user_data)
2060 {
2061   GstRTSPStreamPrivate *priv;
2062   GList *walk;
2063   GstSample *sample;
2064   GstBuffer *buffer;
2065   GstRTSPStream *stream;
2066   gboolean is_rtp;
2067
2068   sample = gst_app_sink_pull_sample (sink);
2069   if (!sample)
2070     return GST_FLOW_OK;
2071
2072   stream = (GstRTSPStream *) user_data;
2073   priv = stream->priv;
2074   buffer = gst_sample_get_buffer (sample);
2075
2076   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2077
2078   g_mutex_lock (&priv->lock);
2079   if (is_rtp) {
2080     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2081       clear_tr_cache (priv, is_rtp);
2082       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2083         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2084         priv->tr_cache_rtp =
2085             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2086       }
2087       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2088     }
2089   } else {
2090     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2091       clear_tr_cache (priv, is_rtp);
2092       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2093         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2094         priv->tr_cache_rtcp =
2095             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2096       }
2097       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2098     }
2099   }
2100   g_mutex_unlock (&priv->lock);
2101
2102   if (is_rtp) {
2103     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2104       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2105       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2106     }
2107   } else {
2108     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2109       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2110       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2111     }
2112   }
2113   gst_sample_unref (sample);
2114
2115   return GST_FLOW_OK;
2116 }
2117
2118 static GstAppSinkCallbacks sink_cb = {
2119   NULL,                         /* not interested in EOS */
2120   NULL,                         /* not interested in preroll samples */
2121   handle_new_sample,
2122 };
2123
2124 static GstElement *
2125 get_rtp_encoder (GstRTSPStream * stream, guint session)
2126 {
2127   GstRTSPStreamPrivate *priv = stream->priv;
2128
2129   if (priv->srtpenc == NULL) {
2130     gchar *name;
2131
2132     name = g_strdup_printf ("srtpenc_%u", session);
2133     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2134     g_free (name);
2135
2136     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2137   }
2138   return gst_object_ref (priv->srtpenc);
2139 }
2140
2141 static GstElement *
2142 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2143 {
2144   GstRTSPStreamPrivate *priv = stream->priv;
2145   GstElement *oldenc, *enc;
2146   GstPad *pad;
2147   gchar *name;
2148
2149   if (priv->idx != session)
2150     return NULL;
2151
2152   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2153
2154   oldenc = priv->srtpenc;
2155   enc = get_rtp_encoder (stream, session);
2156   name = g_strdup_printf ("rtp_sink_%d", session);
2157   pad = gst_element_get_request_pad (enc, name);
2158   g_free (name);
2159   gst_object_unref (pad);
2160
2161   if (oldenc == NULL)
2162     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2163         enc);
2164
2165   return enc;
2166 }
2167
2168 static GstElement *
2169 request_rtcp_encoder (GstElement * rtpbin, guint session,
2170     GstRTSPStream * stream)
2171 {
2172   GstRTSPStreamPrivate *priv = stream->priv;
2173   GstElement *oldenc, *enc;
2174   GstPad *pad;
2175   gchar *name;
2176
2177   if (priv->idx != session)
2178     return NULL;
2179
2180   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2181
2182   oldenc = priv->srtpenc;
2183   enc = get_rtp_encoder (stream, session);
2184   name = g_strdup_printf ("rtcp_sink_%d", session);
2185   pad = gst_element_get_request_pad (enc, name);
2186   g_free (name);
2187   gst_object_unref (pad);
2188
2189   if (oldenc == NULL)
2190     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2191         enc);
2192
2193   return enc;
2194 }
2195
2196 static GstCaps *
2197 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2198 {
2199   GstRTSPStreamPrivate *priv = stream->priv;
2200   GstCaps *caps;
2201
2202   GST_DEBUG ("request key %08x", ssrc);
2203
2204   g_mutex_lock (&priv->lock);
2205   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2206     gst_caps_ref (caps);
2207   g_mutex_unlock (&priv->lock);
2208
2209   return caps;
2210 }
2211
2212 static GstElement *
2213 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2214     GstRTSPStream * stream)
2215 {
2216   GstRTSPStreamPrivate *priv = stream->priv;
2217
2218   if (priv->idx != session)
2219     return NULL;
2220
2221   if (priv->srtpdec == NULL) {
2222     gchar *name;
2223
2224     name = g_strdup_printf ("srtpdec_%u", session);
2225     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2226     g_free (name);
2227
2228     g_signal_connect (priv->srtpdec, "request-key",
2229         (GCallback) request_key, stream);
2230   }
2231   return gst_object_ref (priv->srtpdec);
2232 }
2233
2234 /**
2235  * gst_rtsp_stream_request_aux_sender:
2236  * @stream: a #GstRTSPStream
2237  * @sessid: the session id
2238  *
2239  * Creating a rtxsend bin
2240  *
2241  * Returns: (transfer full): a #GstElement.
2242  *
2243  * Since: 1.6
2244  */
2245 GstElement *
2246 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2247 {
2248   GstElement *bin;
2249   GstPad *pad;
2250   GstStructure *pt_map;
2251   gchar *name;
2252   guint pt, rtx_pt;
2253   gchar *pt_s;
2254
2255   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2256
2257   pt = gst_rtsp_stream_get_pt (stream);
2258   pt_s = g_strdup_printf ("%u", pt);
2259   rtx_pt = stream->priv->rtx_pt;
2260
2261   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2262
2263   bin = gst_bin_new (NULL);
2264   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2265   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2266       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2267   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2268       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2269   g_free (pt_s);
2270   gst_structure_free (pt_map);
2271   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2272
2273   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2274   name = g_strdup_printf ("src_%u", sessid);
2275   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2276   g_free (name);
2277   gst_object_unref (pad);
2278
2279   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2280   name = g_strdup_printf ("sink_%u", sessid);
2281   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2282   g_free (name);
2283   gst_object_unref (pad);
2284
2285   return bin;
2286 }
2287
2288 /**
2289  * gst_rtsp_stream_set_pt_map:
2290  * @stream: a #GstRTSPStream
2291  * @pt: the pt
2292  * @caps: a #GstCaps
2293  *
2294  * Configure a pt map between @pt and @caps.
2295  */
2296 void
2297 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2298 {
2299   GstRTSPStreamPrivate *priv = stream->priv;
2300
2301   g_mutex_lock (&priv->lock);
2302   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2303   g_mutex_unlock (&priv->lock);
2304 }
2305
2306 /**
2307  * gst_rtsp_stream_set_publish_clock_mode:
2308  * @stream: a #GstRTSPStream
2309  * @mode: the clock publish mode
2310  *
2311  * Sets if and how the stream clock should be published according to RFC7273.
2312  *
2313  * Since: 1.8
2314  */
2315 void
2316 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2317     GstRTSPPublishClockMode mode)
2318 {
2319   GstRTSPStreamPrivate *priv;
2320
2321   priv = stream->priv;
2322   g_mutex_lock (&priv->lock);
2323   priv->publish_clock_mode = mode;
2324   g_mutex_unlock (&priv->lock);
2325 }
2326
2327 /**
2328  * gst_rtsp_stream_get_publish_clock_mode:
2329  * @factory: a #GstRTSPStream
2330  *
2331  * Gets if and how the stream clock should be published according to RFC7273.
2332  *
2333  * Returns: The GstRTSPPublishClockMode
2334  *
2335  * Since: 1.8
2336  */
2337 GstRTSPPublishClockMode
2338 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2339 {
2340   GstRTSPStreamPrivate *priv;
2341   GstRTSPPublishClockMode ret;
2342
2343   priv = stream->priv;
2344   g_mutex_lock (&priv->lock);
2345   ret = priv->publish_clock_mode;
2346   g_mutex_unlock (&priv->lock);
2347
2348   return ret;
2349 }
2350
2351 static GstCaps *
2352 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2353     GstRTSPStream * stream)
2354 {
2355   GstRTSPStreamPrivate *priv = stream->priv;
2356   GstCaps *caps = NULL;
2357
2358   g_mutex_lock (&priv->lock);
2359
2360   if (priv->idx == session) {
2361     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2362     if (caps) {
2363       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2364       gst_caps_ref (caps);
2365     } else {
2366       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2367     }
2368   }
2369
2370   g_mutex_unlock (&priv->lock);
2371
2372   return caps;
2373 }
2374
2375 static void
2376 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2377 {
2378   GstRTSPStreamPrivate *priv = stream->priv;
2379   gchar *name;
2380   GstPadLinkReturn ret;
2381   guint sessid;
2382
2383   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2384       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2385
2386   name = gst_pad_get_name (pad);
2387   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2388     g_free (name);
2389     return;
2390   }
2391   g_free (name);
2392
2393   if (priv->idx != sessid)
2394     return;
2395
2396   if (gst_pad_is_linked (priv->sinkpad)) {
2397     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2398         GST_DEBUG_PAD_NAME (priv->sinkpad));
2399     return;
2400   }
2401
2402   /* link the RTP pad to the session manager, it should not really fail unless
2403    * this is not really an RTP pad */
2404   ret = gst_pad_link (pad, priv->sinkpad);
2405   if (ret != GST_PAD_LINK_OK)
2406     goto link_failed;
2407   priv->recv_rtp_src = gst_object_ref (pad);
2408
2409   return;
2410
2411 /* ERRORS */
2412 link_failed:
2413   {
2414     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2415         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2416   }
2417 }
2418
2419 static void
2420 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2421     GstRTSPStream * stream)
2422 {
2423   /* TODO: What to do here other than this? */
2424   GST_DEBUG ("Stream %p: Got EOS", stream);
2425   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2426 }
2427
2428 /* must be called with lock */
2429 static gboolean
2430 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2431 {
2432   GstRTSPStreamPrivate *priv;
2433   GstPad *pad, *sinkpad = NULL;
2434   gboolean is_tcp = FALSE, is_udp = FALSE;
2435   gint i;
2436
2437   priv = stream->priv;
2438
2439   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2440   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2441       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2442
2443   if (is_udp && !create_and_configure_udpsinks (stream))
2444     goto no_udp_protocol;
2445
2446   for (i = 0; i < 2; i++) {
2447     GstPad *teepad, *queuepad;
2448     /* For the sender we create this bit of pipeline for both
2449      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2450      * we need to add a queue before appsink and udpsink to make
2451      * the pipeline not block. For the TCP case, we want to pump
2452      * client as fast as possible anyway. This pipeline is used
2453      * when both TCP and UDP are present.
2454      *
2455      * .--------.      .-----.    .---------.    .---------.
2456      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2457      * |       send->sink   src->sink      src->sink       |
2458      * '--------'      |     |    '---------'    '---------'
2459      *                 |     |    .---------.    .---------.
2460      *                 |     |    |  queue  |    | appsink |
2461      *                 |    src->sink      src->sink       |
2462      *                 '-----'    '---------'    '---------'
2463      *
2464      * When only UDP or only TCP is allowed, we skip the tee and queue
2465      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2466      * the session.
2467      */
2468     /* Only link the RTP send src if we're going to send RTP, link
2469      * the RTCP send src always */
2470     if (priv->srcpad || i == 1) {
2471       if (is_udp) {
2472         /* add udpsink */
2473         gst_bin_add (bin, priv->udpsink[i]);
2474         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2475       }
2476
2477       if (is_tcp) {
2478         /* make appsink */
2479         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2480         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2481         gst_bin_add (bin, priv->appsink[i]);
2482         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2483             &sink_cb, stream, NULL);
2484       }
2485
2486       if (is_udp && is_tcp) {
2487         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2488
2489         /* make tee for RTP/RTCP */
2490         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2491         gst_bin_add (bin, priv->tee[i]);
2492
2493         /* and link to rtpbin send pad */
2494         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2495         gst_pad_link (priv->send_src[i], pad);
2496         gst_object_unref (pad);
2497
2498         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2499         g_object_set (priv->udpqueue[i], "max-size-buffers",
2500             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2501             NULL);
2502         gst_bin_add (bin, priv->udpqueue[i]);
2503         /* link tee to udpqueue */
2504         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2505         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2506         gst_pad_link (teepad, pad);
2507         gst_object_unref (pad);
2508         gst_object_unref (teepad);
2509
2510         /* link udpqueue to udpsink */
2511         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2512         gst_pad_link (queuepad, sinkpad);
2513         gst_object_unref (queuepad);
2514         gst_object_unref (sinkpad);
2515
2516         /* make appqueue */
2517         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2518         g_object_set (priv->appqueue[i], "max-size-buffers",
2519             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2520             NULL);
2521         gst_bin_add (bin, priv->appqueue[i]);
2522         /* and link tee to appqueue */
2523         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2524         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2525         gst_pad_link (teepad, pad);
2526         gst_object_unref (pad);
2527         gst_object_unref (teepad);
2528
2529         /* and link appqueue to appsink */
2530         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2531         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2532         gst_pad_link (queuepad, pad);
2533         gst_object_unref (pad);
2534         gst_object_unref (queuepad);
2535       } else if (is_tcp) {
2536         /* only appsink needed, link it to the session */
2537         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2538         gst_pad_link (priv->send_src[i], pad);
2539         gst_object_unref (pad);
2540
2541         /* when its only TCP, we need to set sync and preroll to FALSE
2542          * for the sink to avoid deadlock. And this is only needed for
2543          * sink used for RTCP data, not the RTP data. */
2544         if (i == 1)
2545           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2546       } else {
2547         /* else only udpsink needed, link it to the session */
2548         gst_pad_link (priv->send_src[i], sinkpad);
2549         gst_object_unref (sinkpad);
2550       }
2551     }
2552
2553     /* check if we need to set to a special state */
2554     if (state != GST_STATE_NULL) {
2555       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2556         gst_element_set_state (priv->udpsink[i], state);
2557       if (priv->appsink[i] && (priv->srcpad || i == 1))
2558         gst_element_set_state (priv->appsink[i], state);
2559       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2560         gst_element_set_state (priv->appqueue[i], state);
2561       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2562         gst_element_set_state (priv->udpqueue[i], state);
2563       if (priv->tee[i] && (priv->srcpad || i == 1))
2564         gst_element_set_state (priv->tee[i], state);
2565     }
2566   }
2567
2568   return TRUE;
2569
2570   /* ERRORS */
2571 no_udp_protocol:
2572   {
2573     return FALSE;
2574   }
2575 }
2576
2577 /* must be called with lock */
2578 static void
2579 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2580 {
2581   GstRTSPStreamPrivate *priv;
2582   GstPad *pad, *selpad;
2583   gboolean is_tcp;
2584   gint i;
2585
2586   priv = stream->priv;
2587
2588   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2589
2590   for (i = 0; i < 2; i++) {
2591     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2592      * RTCP sink always */
2593     if (priv->sinkpad || i == 1) {
2594       /* For the receiver we create this bit of pipeline for both
2595        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2596        * and it is all funneled into the rtpbin receive pad.
2597        *
2598        * .--------.     .--------.    .--------.
2599        * | udpsrc |     | funnel |    | rtpbin |
2600        * |       src->sink      src->sink      |
2601        * '--------'     |        |    '--------'
2602        * .--------.     |        |
2603        * | appsrc |     |        |
2604        * |       src->sink       |
2605        * '--------'     '--------'
2606        */
2607       /* make funnel for the RTP/RTCP receivers */
2608       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2609       gst_bin_add (bin, priv->funnel[i]);
2610
2611       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2612       gst_pad_link (pad, priv->recv_sink[i]);
2613       gst_object_unref (pad);
2614
2615       if (priv->udpsrc_v4[i]) {
2616         if (priv->srcpad) {
2617           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2618            * values. This is only relevant for PLAY pipelines */
2619           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2620           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2621         }
2622         /* add udpsrc */
2623         gst_bin_add (bin, priv->udpsrc_v4[i]);
2624
2625         /* and link to the funnel v4 */
2626         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2627         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2628         gst_pad_link (pad, selpad);
2629         gst_object_unref (pad);
2630         gst_object_unref (selpad);
2631       }
2632
2633       if (priv->udpsrc_v6[i]) {
2634         if (priv->srcpad) {
2635           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2636           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2637         }
2638         gst_bin_add (bin, priv->udpsrc_v6[i]);
2639
2640         /* and link to the funnel v6 */
2641         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2642         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2643         gst_pad_link (pad, selpad);
2644         gst_object_unref (pad);
2645         gst_object_unref (selpad);
2646       }
2647
2648       if (is_tcp) {
2649         /* make and add appsrc */
2650         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2651         priv->appsrc_base_time[i] = -1;
2652         if (priv->srcpad) {
2653           gst_element_set_state (priv->appsrc[i], GST_STATE_PLAYING);
2654           gst_element_set_locked_state (priv->appsrc[i], TRUE);
2655         }
2656         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2657             TRUE, NULL);
2658         gst_bin_add (bin, priv->appsrc[i]);
2659         /* and link to the funnel */
2660         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2661         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2662         gst_pad_link (pad, selpad);
2663         gst_object_unref (pad);
2664         gst_object_unref (selpad);
2665       }
2666     }
2667
2668     /* check if we need to set to a special state */
2669     if (state != GST_STATE_NULL) {
2670       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2671         gst_element_set_state (priv->funnel[i], state);
2672     }
2673   }
2674 }
2675
2676 /**
2677  * gst_rtsp_stream_join_bin:
2678  * @stream: a #GstRTSPStream
2679  * @bin: (transfer none): a #GstBin to join
2680  * @rtpbin: (transfer none): a rtpbin element in @bin
2681  * @state: the target state of the new elements
2682  *
2683  * Join the #GstBin @bin that contains the element @rtpbin.
2684  *
2685  * @stream will link to @rtpbin, which must be inside @bin. The elements
2686  * added to @bin will be set to the state given in @state.
2687  *
2688  * Returns: %TRUE on success.
2689  */
2690 gboolean
2691 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2692     GstElement * rtpbin, GstState state)
2693 {
2694   GstRTSPStreamPrivate *priv;
2695   guint idx;
2696   gchar *name;
2697   GstPadLinkReturn ret;
2698
2699   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2700   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2701   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2702
2703   priv = stream->priv;
2704
2705   g_mutex_lock (&priv->lock);
2706   if (priv->is_joined)
2707     goto was_joined;
2708
2709   /* create a session with the same index as the stream */
2710   idx = priv->idx;
2711
2712   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2713
2714   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2715       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2716     /* For SRTP */
2717     g_signal_connect (rtpbin, "request-rtp-encoder",
2718         (GCallback) request_rtp_encoder, stream);
2719     g_signal_connect (rtpbin, "request-rtcp-encoder",
2720         (GCallback) request_rtcp_encoder, stream);
2721     g_signal_connect (rtpbin, "request-rtp-decoder",
2722         (GCallback) request_rtp_rtcp_decoder, stream);
2723     g_signal_connect (rtpbin, "request-rtcp-decoder",
2724         (GCallback) request_rtp_rtcp_decoder, stream);
2725   }
2726
2727   if (priv->sinkpad) {
2728     g_signal_connect (rtpbin, "request-pt-map",
2729         (GCallback) request_pt_map, stream);
2730   }
2731
2732   /* get pads from the RTP session element for sending and receiving
2733    * RTP/RTCP*/
2734   if (priv->srcpad) {
2735     /* get a pad for sending RTP */
2736     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2737     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2738     g_free (name);
2739
2740     /* link the RTP pad to the session manager, it should not really fail unless
2741      * this is not really an RTP pad */
2742     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2743     if (ret != GST_PAD_LINK_OK)
2744       goto link_failed;
2745
2746     name = g_strdup_printf ("send_rtp_src_%u", idx);
2747     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2748     g_free (name);
2749   } else {
2750     /* Need to connect our sinkpad from here */
2751     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2752     /* EOS */
2753     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2754
2755     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2756     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2757     g_free (name);
2758   }
2759
2760   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2761   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2762   g_free (name);
2763   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2764   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2765   g_free (name);
2766
2767   /* get the session */
2768   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2769
2770   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2771       stream);
2772   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2773       stream);
2774   g_signal_connect (priv->session, "on-ssrc-active",
2775       (GCallback) on_ssrc_active, stream);
2776   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2777       stream);
2778   g_signal_connect (priv->session, "on-bye-timeout",
2779       (GCallback) on_bye_timeout, stream);
2780   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2781       stream);
2782
2783   /* signal for sender ssrc */
2784   g_signal_connect (priv->session, "on-new-sender-ssrc",
2785       (GCallback) on_new_sender_ssrc, stream);
2786   g_signal_connect (priv->session, "on-sender-ssrc-active",
2787       (GCallback) on_sender_ssrc_active, stream);
2788
2789   if (!create_sender_part (stream, bin, state))
2790     goto no_udp_protocol;
2791
2792   create_receiver_part (stream, bin, state);
2793
2794   if (priv->srcpad) {
2795     /* be notified of caps changes */
2796     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2797         (GCallback) caps_notify, stream);
2798   }
2799
2800   priv->joined_bin = bin;
2801   priv->is_joined = TRUE;
2802   g_mutex_unlock (&priv->lock);
2803
2804   return TRUE;
2805
2806   /* ERRORS */
2807 was_joined:
2808   {
2809     g_mutex_unlock (&priv->lock);
2810     return TRUE;
2811   }
2812 link_failed:
2813   {
2814     GST_WARNING ("failed to link stream %u", idx);
2815     gst_object_unref (priv->send_rtp_sink);
2816     priv->send_rtp_sink = NULL;
2817     g_mutex_unlock (&priv->lock);
2818     return FALSE;
2819   }
2820 no_udp_protocol:
2821   {
2822     GST_WARNING ("failed to allocate ports %u", idx);
2823     gst_object_unref (priv->send_rtp_sink);
2824     priv->send_rtp_sink = NULL;
2825     gst_object_unref (priv->send_src[0]);
2826     priv->send_src[0] = NULL;
2827     gst_object_unref (priv->send_src[1]);
2828     priv->send_src[1] = NULL;
2829     gst_object_unref (priv->recv_sink[0]);
2830     priv->recv_sink[0] = NULL;
2831     gst_object_unref (priv->recv_sink[1]);
2832     priv->recv_sink[1] = NULL;
2833     if (priv->udpsink[0])
2834       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2835     if (priv->udpsink[1])
2836       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2837     if (priv->udpsrc_v4[0]) {
2838       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2839       gst_object_unref (priv->udpsrc_v4[0]);
2840       priv->udpsrc_v4[0] = NULL;
2841     }
2842     if (priv->udpsrc_v4[1]) {
2843       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2844       gst_object_unref (priv->udpsrc_v4[1]);
2845       priv->udpsrc_v4[1] = NULL;
2846     }
2847     if (priv->udpsrc_mcast_v4[0]) {
2848       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2849       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2850       priv->udpsrc_mcast_v4[0] = NULL;
2851     }
2852     if (priv->udpsrc_mcast_v4[1]) {
2853       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2854       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2855       priv->udpsrc_mcast_v4[1] = NULL;
2856     }
2857     if (priv->udpsrc_v6[0]) {
2858       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2859       gst_object_unref (priv->udpsrc_v6[0]);
2860       priv->udpsrc_v6[0] = NULL;
2861     }
2862     if (priv->udpsrc_v6[1]) {
2863       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2864       gst_object_unref (priv->udpsrc_v6[1]);
2865       priv->udpsrc_v6[1] = NULL;
2866     }
2867     if (priv->udpsrc_mcast_v6[0]) {
2868       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2869       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2870       priv->udpsrc_mcast_v6[0] = NULL;
2871     }
2872     if (priv->udpsrc_mcast_v6[1]) {
2873       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2874       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2875       priv->udpsrc_mcast_v6[1] = NULL;
2876     }
2877     g_mutex_unlock (&priv->lock);
2878     return FALSE;
2879   }
2880 }
2881
2882 /**
2883  * gst_rtsp_stream_leave_bin:
2884  * @stream: a #GstRTSPStream
2885  * @bin: (transfer none): a #GstBin
2886  * @rtpbin: (transfer none): a rtpbin #GstElement
2887  *
2888  * Remove the elements of @stream from @bin.
2889  *
2890  * Return: %TRUE on success.
2891  */
2892 gboolean
2893 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2894     GstElement * rtpbin)
2895 {
2896   GstRTSPStreamPrivate *priv;
2897   gint i;
2898   gboolean is_tcp, is_udp;
2899
2900   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2901   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2902   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2903
2904   priv = stream->priv;
2905
2906   g_mutex_lock (&priv->lock);
2907   if (!priv->is_joined)
2908     goto was_not_joined;
2909
2910   priv->joined_bin = NULL;
2911
2912   /* all transports must be removed by now */
2913   if (priv->transports != NULL)
2914     goto transports_not_removed;
2915
2916   clear_tr_cache (priv, TRUE);
2917   clear_tr_cache (priv, FALSE);
2918
2919   GST_INFO ("stream %p leaving bin", stream);
2920
2921   if (priv->srcpad) {
2922     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2923
2924     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2925     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2926     gst_object_unref (priv->send_rtp_sink);
2927     priv->send_rtp_sink = NULL;
2928   } else if (priv->recv_rtp_src) {
2929     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2930     gst_object_unref (priv->recv_rtp_src);
2931     priv->recv_rtp_src = NULL;
2932   }
2933
2934   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2935
2936   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2937       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2938
2939
2940   for (i = 0; i < 2; i++) {
2941     if (priv->udpsink[i])
2942       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2943     if (priv->appsink[i])
2944       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2945     if (priv->appqueue[i])
2946       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2947     if (priv->udpqueue[i])
2948       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2949     if (priv->tee[i])
2950       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2951     if (priv->funnel[i])
2952       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2953     if (priv->appsrc[i])
2954       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2955
2956     if (priv->udpsrc_v4[i]) {
2957       if (priv->sinkpad || i == 1) {
2958         /* and set udpsrc to NULL now before removing */
2959         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2960         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2961         /* removing them should also nicely release the request
2962          * pads when they finalize */
2963         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2964       } else {
2965         /* we need to set the state to NULL before unref */
2966         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2967         gst_object_unref (priv->udpsrc_v4[i]);
2968       }
2969     }
2970
2971     if (priv->udpsrc_mcast_v4[i]) {
2972       if (priv->sinkpad || i == 1) {
2973         /* and set udpsrc to NULL now before removing */
2974         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2975         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2976         /* removing them should also nicely release the request
2977          * pads when they finalize */
2978         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2979       } else {
2980         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2981         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2982       }
2983     }
2984
2985     if (priv->udpsrc_v6[i]) {
2986       if (priv->sinkpad || i == 1) {
2987         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2988         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2989         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2990       } else {
2991         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2992         gst_object_unref (priv->udpsrc_v6[i]);
2993       }
2994     }
2995     if (priv->udpsrc_mcast_v6[i]) {
2996       if (priv->sinkpad || i == 1) {
2997         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
2998         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2999         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
3000       } else {
3001         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
3002         gst_object_unref (priv->udpsrc_mcast_v6[i]);
3003       }
3004     }
3005
3006     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
3007       gst_bin_remove (bin, priv->udpsink[i]);
3008     if (priv->appsrc[i]) {
3009       if (priv->sinkpad || i == 1) {
3010         gst_element_set_locked_state (priv->appsrc[i], FALSE);
3011         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
3012         gst_bin_remove (bin, priv->appsrc[i]);
3013       } else {
3014         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
3015         gst_object_unref (priv->appsrc[i]);
3016       }
3017     }
3018     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
3019       gst_bin_remove (bin, priv->appsink[i]);
3020     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
3021       gst_bin_remove (bin, priv->appqueue[i]);
3022     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
3023       gst_bin_remove (bin, priv->udpqueue[i]);
3024     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
3025       gst_bin_remove (bin, priv->tee[i]);
3026     if (priv->funnel[i] && (priv->sinkpad || i == 1))
3027       gst_bin_remove (bin, priv->funnel[i]);
3028
3029     if (priv->sinkpad || i == 1) {
3030       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3031       gst_object_unref (priv->recv_sink[i]);
3032       priv->recv_sink[i] = NULL;
3033     }
3034
3035     priv->udpsrc_v4[i] = NULL;
3036     priv->udpsrc_v6[i] = NULL;
3037     priv->udpsrc_mcast_v4[i] = NULL;
3038     priv->udpsrc_mcast_v6[i] = NULL;
3039     priv->udpsink[i] = NULL;
3040     priv->appsrc[i] = NULL;
3041     priv->appsink[i] = NULL;
3042     priv->appqueue[i] = NULL;
3043     priv->udpqueue[i] = NULL;
3044     priv->tee[i] = NULL;
3045     priv->funnel[i] = NULL;
3046   }
3047
3048   if (priv->srcpad) {
3049     gst_object_unref (priv->send_src[0]);
3050     priv->send_src[0] = NULL;
3051   }
3052
3053   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3054   gst_object_unref (priv->send_src[1]);
3055   priv->send_src[1] = NULL;
3056
3057   g_object_unref (priv->session);
3058   priv->session = NULL;
3059   if (priv->caps)
3060     gst_caps_unref (priv->caps);
3061   priv->caps = NULL;
3062
3063   if (priv->srtpenc)
3064     gst_object_unref (priv->srtpenc);
3065   if (priv->srtpdec)
3066     gst_object_unref (priv->srtpdec);
3067
3068   priv->is_joined = FALSE;
3069   g_mutex_unlock (&priv->lock);
3070
3071   return TRUE;
3072
3073 was_not_joined:
3074   {
3075     g_mutex_unlock (&priv->lock);
3076     return TRUE;
3077   }
3078 transports_not_removed:
3079   {
3080     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3081     g_mutex_unlock (&priv->lock);
3082     return FALSE;
3083   }
3084 }
3085
3086 /**
3087  * gst_rtsp_stream_get_joined_bin:
3088  * @stream: a #GstRTSPStream
3089  *
3090  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3091  *
3092  * Return: (transfer full): the joined bin or NULL.
3093  */
3094 GstBin *
3095 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3096 {
3097   GstRTSPStreamPrivate *priv;
3098   GstBin *bin = NULL;
3099
3100   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3101
3102   priv = stream->priv;
3103
3104   g_mutex_lock (&priv->lock);
3105   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3106   g_mutex_unlock (&priv->lock);
3107
3108   return bin;
3109 }
3110
3111 /**
3112  * gst_rtsp_stream_get_rtpinfo:
3113  * @stream: a #GstRTSPStream
3114  * @rtptime: (allow-none): result RTP timestamp
3115  * @seq: (allow-none): result RTP seqnum
3116  * @clock_rate: (allow-none): the clock rate
3117  * @running_time: (allow-none): result running-time
3118  *
3119  * Retrieve the current rtptime, seq and running-time. This is used to
3120  * construct a RTPInfo reply header.
3121  *
3122  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3123  */
3124 gboolean
3125 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3126     guint * rtptime, guint * seq, guint * clock_rate,
3127     GstClockTime * running_time)
3128 {
3129   GstRTSPStreamPrivate *priv;
3130   GstStructure *stats;
3131   GObjectClass *payobjclass;
3132
3133   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3134
3135   priv = stream->priv;
3136
3137   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3138
3139   g_mutex_lock (&priv->lock);
3140
3141   /* First try to extract the information from the last buffer on the sinks.
3142    * This will have a more accurate sequence number and timestamp, as between
3143    * the payloader and the sink there can be some queues
3144    */
3145   if (priv->udpsink[0] || priv->appsink[0]) {
3146     GstSample *last_sample;
3147
3148     if (priv->udpsink[0])
3149       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3150     else
3151       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3152
3153     if (last_sample) {
3154       GstCaps *caps;
3155       GstBuffer *buffer;
3156       GstSegment *segment;
3157       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3158
3159       caps = gst_sample_get_caps (last_sample);
3160       buffer = gst_sample_get_buffer (last_sample);
3161       segment = gst_sample_get_segment (last_sample);
3162
3163       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3164         if (seq) {
3165           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3166         }
3167
3168         if (rtptime) {
3169           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3170         }
3171
3172         gst_rtp_buffer_unmap (&rtp_buffer);
3173
3174         if (running_time) {
3175           *running_time =
3176               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3177               GST_BUFFER_TIMESTAMP (buffer));
3178         }
3179
3180         if (clock_rate) {
3181           GstStructure *s = gst_caps_get_structure (caps, 0);
3182
3183           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3184
3185           if (*clock_rate == 0 && running_time)
3186             *running_time = GST_CLOCK_TIME_NONE;
3187         }
3188         gst_sample_unref (last_sample);
3189
3190         goto done;
3191       } else {
3192         gst_sample_unref (last_sample);
3193       }
3194     }
3195   }
3196
3197   if (g_object_class_find_property (payobjclass, "stats")) {
3198     g_object_get (priv->payloader, "stats", &stats, NULL);
3199     if (stats == NULL)
3200       goto no_stats;
3201
3202     if (seq)
3203       gst_structure_get_uint (stats, "seqnum", seq);
3204
3205     if (rtptime)
3206       gst_structure_get_uint (stats, "timestamp", rtptime);
3207
3208     if (running_time)
3209       gst_structure_get_clock_time (stats, "running-time", running_time);
3210
3211     if (clock_rate) {
3212       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3213       if (*clock_rate == 0 && running_time)
3214         *running_time = GST_CLOCK_TIME_NONE;
3215     }
3216     gst_structure_free (stats);
3217   } else {
3218     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3219         !g_object_class_find_property (payobjclass, "timestamp"))
3220       goto no_stats;
3221
3222     if (seq)
3223       g_object_get (priv->payloader, "seqnum", seq, NULL);
3224
3225     if (rtptime)
3226       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3227
3228     if (running_time)
3229       *running_time = GST_CLOCK_TIME_NONE;
3230   }
3231
3232 done:
3233   g_mutex_unlock (&priv->lock);
3234
3235   return TRUE;
3236
3237   /* ERRORS */
3238 no_stats:
3239   {
3240     GST_WARNING ("Could not get payloader stats");
3241     g_mutex_unlock (&priv->lock);
3242     return FALSE;
3243   }
3244 }
3245
3246 /**
3247  * gst_rtsp_stream_get_caps:
3248  * @stream: a #GstRTSPStream
3249  *
3250  * Retrieve the current caps of @stream.
3251  *
3252  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3253  * after usage.
3254  */
3255 GstCaps *
3256 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3257 {
3258   GstRTSPStreamPrivate *priv;
3259   GstCaps *result;
3260
3261   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3262
3263   priv = stream->priv;
3264
3265   g_mutex_lock (&priv->lock);
3266   if ((result = priv->caps))
3267     gst_caps_ref (result);
3268   g_mutex_unlock (&priv->lock);
3269
3270   return result;
3271 }
3272
3273 /**
3274  * gst_rtsp_stream_recv_rtp:
3275  * @stream: a #GstRTSPStream
3276  * @buffer: (transfer full): a #GstBuffer
3277  *
3278  * Handle an RTP buffer for the stream. This method is usually called when a
3279  * message has been received from a client using the TCP transport.
3280  *
3281  * This function takes ownership of @buffer.
3282  *
3283  * Returns: a GstFlowReturn.
3284  */
3285 GstFlowReturn
3286 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3287 {
3288   GstRTSPStreamPrivate *priv;
3289   GstFlowReturn ret;
3290   GstElement *element;
3291
3292   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3293   priv = stream->priv;
3294   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3295   g_return_val_if_fail (priv->is_joined, FALSE);
3296
3297   g_mutex_lock (&priv->lock);
3298   if (priv->appsrc[0])
3299     element = gst_object_ref (priv->appsrc[0]);
3300   else
3301     element = NULL;
3302   g_mutex_unlock (&priv->lock);
3303
3304   if (element) {
3305     if (priv->appsrc_base_time[0] == -1) {
3306       /* Take current running_time. This timestamp will be put on
3307        * the first buffer of each stream because we are a live source and so we
3308        * timestamp with the running_time. When we are dealing with TCP, we also
3309        * only timestamp the first buffer (using the DISCONT flag) because a server
3310        * typically bursts data, for which we don't want to compensate by speeding
3311        * up the media. The other timestamps will be interpollated from this one
3312        * using the RTP timestamps. */
3313       GST_OBJECT_LOCK (element);
3314       if (GST_ELEMENT_CLOCK (element)) {
3315         GstClockTime now;
3316         GstClockTime base_time;
3317
3318         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3319         base_time = GST_ELEMENT_CAST (element)->base_time;
3320
3321         priv->appsrc_base_time[0] = now - base_time;
3322         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3323         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3324             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3325             GST_TIME_ARGS (base_time));
3326       }
3327       GST_OBJECT_UNLOCK (element);
3328     }
3329
3330     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3331     gst_object_unref (element);
3332   } else {
3333     ret = GST_FLOW_OK;
3334   }
3335   return ret;
3336 }
3337
3338 /**
3339  * gst_rtsp_stream_recv_rtcp:
3340  * @stream: a #GstRTSPStream
3341  * @buffer: (transfer full): a #GstBuffer
3342  *
3343  * Handle an RTCP buffer for the stream. This method is usually called when a
3344  * message has been received from a client using the TCP transport.
3345  *
3346  * This function takes ownership of @buffer.
3347  *
3348  * Returns: a GstFlowReturn.
3349  */
3350 GstFlowReturn
3351 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3352 {
3353   GstRTSPStreamPrivate *priv;
3354   GstFlowReturn ret;
3355   GstElement *element;
3356
3357   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3358   priv = stream->priv;
3359   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3360
3361   if (!priv->is_joined) {
3362     gst_buffer_unref (buffer);
3363     return GST_FLOW_NOT_LINKED;
3364   }
3365   g_mutex_lock (&priv->lock);
3366   if (priv->appsrc[1])
3367     element = gst_object_ref (priv->appsrc[1]);
3368   else
3369     element = NULL;
3370   g_mutex_unlock (&priv->lock);
3371
3372   if (element) {
3373     if (priv->appsrc_base_time[1] == -1) {
3374       /* Take current running_time. This timestamp will be put on
3375        * the first buffer of each stream because we are a live source and so we
3376        * timestamp with the running_time. When we are dealing with TCP, we also
3377        * only timestamp the first buffer (using the DISCONT flag) because a server
3378        * typically bursts data, for which we don't want to compensate by speeding
3379        * up the media. The other timestamps will be interpollated from this one
3380        * using the RTP timestamps. */
3381       GST_OBJECT_LOCK (element);
3382       if (GST_ELEMENT_CLOCK (element)) {
3383         GstClockTime now;
3384         GstClockTime base_time;
3385
3386         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3387         base_time = GST_ELEMENT_CAST (element)->base_time;
3388
3389         priv->appsrc_base_time[1] = now - base_time;
3390         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3391         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3392             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3393             GST_TIME_ARGS (base_time));
3394       }
3395       GST_OBJECT_UNLOCK (element);
3396     }
3397
3398     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3399     gst_object_unref (element);
3400   } else {
3401     ret = GST_FLOW_OK;
3402     gst_buffer_unref (buffer);
3403   }
3404   return ret;
3405 }
3406
3407 /* must be called with lock */
3408 static gboolean
3409 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3410     gboolean add)
3411 {
3412   GstRTSPStreamPrivate *priv = stream->priv;
3413   const GstRTSPTransport *tr;
3414
3415   tr = gst_rtsp_stream_transport_get_transport (trans);
3416
3417   switch (tr->lower_transport) {
3418     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3419     case GST_RTSP_LOWER_TRANS_UDP:
3420     {
3421       gchar *dest;
3422       gint min, max;
3423       guint ttl = 0;
3424
3425       dest = tr->destination;
3426       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3427         min = tr->port.min;
3428         max = tr->port.max;
3429         ttl = tr->ttl;
3430       } else if (priv->client_side) {
3431         /* In client side mode the 'destination' is the RTSP server, so send
3432          * to those ports */
3433         min = tr->server_port.min;
3434         max = tr->server_port.max;
3435       } else {
3436         min = tr->client_port.min;
3437         max = tr->client_port.max;
3438       }
3439
3440       if (add) {
3441         if (ttl > 0) {
3442           GST_INFO ("setting ttl-mc %d", ttl);
3443           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3444           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3445         }
3446         GST_INFO ("adding %s:%d-%d", dest, min, max);
3447         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3448         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3449         priv->transports = g_list_prepend (priv->transports, trans);
3450       } else {
3451         GST_INFO ("removing %s:%d-%d", dest, min, max);
3452         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3453         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3454         priv->transports = g_list_remove (priv->transports, trans);
3455       }
3456       priv->transports_cookie++;
3457       break;
3458     }
3459     case GST_RTSP_LOWER_TRANS_TCP:
3460       if (add) {
3461         GST_INFO ("adding TCP %s", tr->destination);
3462         priv->transports = g_list_prepend (priv->transports, trans);
3463       } else {
3464         GST_INFO ("removing TCP %s", tr->destination);
3465         priv->transports = g_list_remove (priv->transports, trans);
3466       }
3467       priv->transports_cookie++;
3468       break;
3469     default:
3470       goto unknown_transport;
3471   }
3472   return TRUE;
3473
3474   /* ERRORS */
3475 unknown_transport:
3476   {
3477     GST_INFO ("Unknown transport %d", tr->lower_transport);
3478     return FALSE;
3479   }
3480 }
3481
3482
3483 /**
3484  * gst_rtsp_stream_add_transport:
3485  * @stream: a #GstRTSPStream
3486  * @trans: (transfer none): a #GstRTSPStreamTransport
3487  *
3488  * Add the transport in @trans to @stream. The media of @stream will
3489  * then also be send to the values configured in @trans.
3490  *
3491  * @stream must be joined to a bin.
3492  *
3493  * @trans must contain a valid #GstRTSPTransport.
3494  *
3495  * Returns: %TRUE if @trans was added
3496  */
3497 gboolean
3498 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3499     GstRTSPStreamTransport * trans)
3500 {
3501   GstRTSPStreamPrivate *priv;
3502   gboolean res;
3503
3504   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3505   priv = stream->priv;
3506   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3507   g_return_val_if_fail (priv->is_joined, FALSE);
3508
3509   g_mutex_lock (&priv->lock);
3510   res = update_transport (stream, trans, TRUE);
3511   g_mutex_unlock (&priv->lock);
3512
3513   return res;
3514 }
3515
3516 /**
3517  * gst_rtsp_stream_remove_transport:
3518  * @stream: a #GstRTSPStream
3519  * @trans: (transfer none): a #GstRTSPStreamTransport
3520  *
3521  * Remove the transport in @trans from @stream. The media of @stream will
3522  * not be sent to the values configured in @trans.
3523  *
3524  * @stream must be joined to a bin.
3525  *
3526  * @trans must contain a valid #GstRTSPTransport.
3527  *
3528  * Returns: %TRUE if @trans was removed
3529  */
3530 gboolean
3531 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3532     GstRTSPStreamTransport * trans)
3533 {
3534   GstRTSPStreamPrivate *priv;
3535   gboolean res;
3536
3537   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3538   priv = stream->priv;
3539   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3540   g_return_val_if_fail (priv->is_joined, FALSE);
3541
3542   g_mutex_lock (&priv->lock);
3543   res = update_transport (stream, trans, FALSE);
3544   g_mutex_unlock (&priv->lock);
3545
3546   return res;
3547 }
3548
3549 /**
3550  * gst_rtsp_stream_update_crypto:
3551  * @stream: a #GstRTSPStream
3552  * @ssrc: the SSRC
3553  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3554  *
3555  * Update the new crypto information for @ssrc in @stream. If information
3556  * for @ssrc did not exist, it will be added. If information
3557  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3558  * be removed from @stream.
3559  *
3560  * Returns: %TRUE if @crypto could be updated
3561  */
3562 gboolean
3563 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3564     guint ssrc, GstCaps * crypto)
3565 {
3566   GstRTSPStreamPrivate *priv;
3567
3568   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3569   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3570
3571   priv = stream->priv;
3572
3573   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3574
3575   g_mutex_lock (&priv->lock);
3576   if (crypto)
3577     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3578         gst_caps_ref (crypto));
3579   else
3580     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3581   g_mutex_unlock (&priv->lock);
3582
3583   return TRUE;
3584 }
3585
3586 /**
3587  * gst_rtsp_stream_get_rtp_socket:
3588  * @stream: a #GstRTSPStream
3589  * @family: the socket family
3590  *
3591  * Get the RTP socket from @stream for a @family.
3592  *
3593  * @stream must be joined to a bin.
3594  *
3595  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3596  * socket could be allocated for @family. Unref after usage
3597  */
3598 GSocket *
3599 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3600 {
3601   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3602   GSocket *socket;
3603   const gchar *name;
3604
3605   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3606   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3607       family == G_SOCKET_FAMILY_IPV6, NULL);
3608   g_return_val_if_fail (priv->udpsink[0], NULL);
3609
3610   if (family == G_SOCKET_FAMILY_IPV6)
3611     name = "socket-v6";
3612   else
3613     name = "socket";
3614
3615   g_object_get (priv->udpsink[0], name, &socket, NULL);
3616
3617   return socket;
3618 }
3619
3620 /**
3621  * gst_rtsp_stream_get_rtcp_socket:
3622  * @stream: a #GstRTSPStream
3623  * @family: the socket family
3624  *
3625  * Get the RTCP socket from @stream for a @family.
3626  *
3627  * @stream must be joined to a bin.
3628  *
3629  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3630  * socket could be allocated for @family. Unref after usage
3631  */
3632 GSocket *
3633 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3634 {
3635   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3636   GSocket *socket;
3637   const gchar *name;
3638
3639   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3640   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3641       family == G_SOCKET_FAMILY_IPV6, NULL);
3642   g_return_val_if_fail (priv->udpsink[1], NULL);
3643
3644   if (family == G_SOCKET_FAMILY_IPV6)
3645     name = "socket-v6";
3646   else
3647     name = "socket";
3648
3649   g_object_get (priv->udpsink[1], name, &socket, NULL);
3650
3651   return socket;
3652 }
3653
3654 /**
3655  * gst_rtsp_stream_set_seqnum:
3656  * @stream: a #GstRTSPStream
3657  * @seqnum: a new sequence number
3658  *
3659  * Configure the sequence number in the payloader of @stream to @seqnum.
3660  */
3661 void
3662 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3663 {
3664   GstRTSPStreamPrivate *priv;
3665
3666   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3667
3668   priv = stream->priv;
3669
3670   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3671 }
3672
3673 /**
3674  * gst_rtsp_stream_get_seqnum:
3675  * @stream: a #GstRTSPStream
3676  *
3677  * Get the configured sequence number in the payloader of @stream.
3678  *
3679  * Returns: the sequence number of the payloader.
3680  */
3681 guint16
3682 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3683 {
3684   GstRTSPStreamPrivate *priv;
3685   guint seqnum;
3686
3687   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3688
3689   priv = stream->priv;
3690
3691   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3692
3693   return seqnum;
3694 }
3695
3696 /**
3697  * gst_rtsp_stream_transport_filter:
3698  * @stream: a #GstRTSPStream
3699  * @func: (scope call) (allow-none): a callback
3700  * @user_data: (closure): user data passed to @func
3701  *
3702  * Call @func for each transport managed by @stream. The result value of @func
3703  * determines what happens to the transport. @func will be called with @stream
3704  * locked so no further actions on @stream can be performed from @func.
3705  *
3706  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3707  * @stream.
3708  *
3709  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3710  *
3711  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3712  * will also be added with an additional ref to the result #GList of this
3713  * function..
3714  *
3715  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3716  *
3717  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3718  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3719  * element in the #GList should be unreffed before the list is freed.
3720  */
3721 GList *
3722 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3723     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3724 {
3725   GstRTSPStreamPrivate *priv;
3726   GList *result, *walk, *next;
3727   GHashTable *visited = NULL;
3728   guint cookie;
3729
3730   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3731
3732   priv = stream->priv;
3733
3734   result = NULL;
3735   if (func)
3736     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3737
3738   g_mutex_lock (&priv->lock);
3739 restart:
3740   cookie = priv->transports_cookie;
3741   for (walk = priv->transports; walk; walk = next) {
3742     GstRTSPStreamTransport *trans = walk->data;
3743     GstRTSPFilterResult res;
3744     gboolean changed;
3745
3746     next = g_list_next (walk);
3747
3748     if (func) {
3749       /* only visit each transport once */
3750       if (g_hash_table_contains (visited, trans))
3751         continue;
3752
3753       g_hash_table_add (visited, g_object_ref (trans));
3754       g_mutex_unlock (&priv->lock);
3755
3756       res = func (stream, trans, user_data);
3757
3758       g_mutex_lock (&priv->lock);
3759     } else
3760       res = GST_RTSP_FILTER_REF;
3761
3762     changed = (cookie != priv->transports_cookie);
3763
3764     switch (res) {
3765       case GST_RTSP_FILTER_REMOVE:
3766         update_transport (stream, trans, FALSE);
3767         break;
3768       case GST_RTSP_FILTER_REF:
3769         result = g_list_prepend (result, g_object_ref (trans));
3770         break;
3771       case GST_RTSP_FILTER_KEEP:
3772       default:
3773         break;
3774     }
3775     if (changed)
3776       goto restart;
3777   }
3778   g_mutex_unlock (&priv->lock);
3779
3780   if (func)
3781     g_hash_table_unref (visited);
3782
3783   return result;
3784 }
3785
3786 static GstPadProbeReturn
3787 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3788 {
3789   GstRTSPStreamPrivate *priv;
3790   GstRTSPStream *stream;
3791
3792   stream = user_data;
3793   priv = stream->priv;
3794
3795   GST_DEBUG_OBJECT (pad, "now blocking");
3796
3797   g_mutex_lock (&priv->lock);
3798   priv->blocking = TRUE;
3799   g_mutex_unlock (&priv->lock);
3800
3801   gst_element_post_message (priv->payloader,
3802       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3803           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3804
3805   return GST_PAD_PROBE_OK;
3806 }
3807
3808 /**
3809  * gst_rtsp_stream_set_blocked:
3810  * @stream: a #GstRTSPStream
3811  * @blocked: boolean indicating we should block or unblock
3812  *
3813  * Blocks or unblocks the dataflow on @stream.
3814  *
3815  * Returns: %TRUE on success
3816  */
3817 gboolean
3818 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3819 {
3820   GstRTSPStreamPrivate *priv;
3821
3822   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3823
3824   priv = stream->priv;
3825
3826   g_mutex_lock (&priv->lock);
3827   if (blocked) {
3828     priv->blocking = FALSE;
3829     if (priv->blocked_id == 0) {
3830       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3831           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3832           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3833           g_object_ref (stream), g_object_unref);
3834     }
3835   } else {
3836     if (priv->blocked_id != 0) {
3837       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3838       priv->blocked_id = 0;
3839       priv->blocking = FALSE;
3840     }
3841   }
3842   g_mutex_unlock (&priv->lock);
3843
3844   return TRUE;
3845 }
3846
3847 /**
3848  * gst_rtsp_stream_is_blocking:
3849  * @stream: a #GstRTSPStream
3850  *
3851  * Check if @stream is blocking on a #GstBuffer.
3852  *
3853  * Returns: %TRUE if @stream is blocking
3854  */
3855 gboolean
3856 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3857 {
3858   GstRTSPStreamPrivate *priv;
3859   gboolean result;
3860
3861   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3862
3863   priv = stream->priv;
3864
3865   g_mutex_lock (&priv->lock);
3866   result = priv->blocking;
3867   g_mutex_unlock (&priv->lock);
3868
3869   return result;
3870 }
3871
3872 /**
3873  * gst_rtsp_stream_query_position:
3874  * @stream: a #GstRTSPStream
3875  *
3876  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3877  * the RTP parts of the pipeline and not the RTCP parts.
3878  *
3879  * Returns: %TRUE if the position could be queried
3880  */
3881 gboolean
3882 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3883 {
3884   GstRTSPStreamPrivate *priv;
3885   GstElement *sink;
3886   gboolean ret;
3887
3888   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3889
3890   priv = stream->priv;
3891
3892   g_mutex_lock (&priv->lock);
3893   /* depending on the transport type, it should query corresponding sink */
3894   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3895       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3896     sink = priv->udpsink[0];
3897   else
3898     sink = priv->appsink[0];
3899
3900   if (sink)
3901     gst_object_ref (sink);
3902   g_mutex_unlock (&priv->lock);
3903
3904   if (!sink)
3905     return FALSE;
3906
3907   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3908   gst_object_unref (sink);
3909
3910   return ret;
3911 }
3912
3913 /**
3914  * gst_rtsp_stream_query_stop:
3915  * @stream: a #GstRTSPStream
3916  *
3917  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3918  * the RTP parts of the pipeline and not the RTCP parts.
3919  *
3920  * Returns: %TRUE if the stop could be queried
3921  */
3922 gboolean
3923 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3924 {
3925   GstRTSPStreamPrivate *priv;
3926   GstElement *sink;
3927   GstQuery *query;
3928   gboolean ret;
3929
3930   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3931
3932   priv = stream->priv;
3933
3934   g_mutex_lock (&priv->lock);
3935   /* depending on the transport type, it should query corresponding sink */
3936   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3937       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3938     sink = priv->udpsink[0];
3939   else
3940     sink = priv->appsink[0];
3941
3942   if (sink)
3943     gst_object_ref (sink);
3944   g_mutex_unlock (&priv->lock);
3945
3946   if (!sink)
3947     return FALSE;
3948
3949   query = gst_query_new_segment (GST_FORMAT_TIME);
3950   if ((ret = gst_element_query (sink, query))) {
3951     GstFormat format;
3952
3953     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3954     if (format != GST_FORMAT_TIME)
3955       *stop = -1;
3956   }
3957   gst_query_unref (query);
3958   gst_object_unref (sink);
3959
3960   return ret;
3961
3962 }