rtsp-stream: Always bind to ANY when address is a multicast address and not only...
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   gboolean is_joined;
74   GstBin *joined_bin;
75
76   /* TRUE if this stream is running on
77    * the client side of an RTSP link (for RECORD) */
78   gboolean client_side;
79   gchar *control;
80
81   GstRTSPProfile profiles;
82   GstRTSPLowerTrans protocols;
83
84   /* pads on the rtpbin */
85   GstPad *send_rtp_sink;
86   GstPad *recv_rtp_src;
87   GstPad *recv_sink[2];
88   GstPad *send_src[2];
89
90   /* the RTPSession object */
91   GObject *session;
92
93   /* SRTP encoder/decoder */
94   GstElement *srtpenc;
95   GstElement *srtpdec;
96   GHashTable *keys;
97
98   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
99    * sockets */
100   GstElement *udpsrc_v4[2];
101   /* UDP sources for UDP multicast transports */
102   GstElement *udpsrc_mcast_v4[2];
103
104   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
105    * sockets */
106   GstElement *udpsrc_v6[2];
107   /* UDP sources for UDP multicast transports */
108   GstElement *udpsrc_mcast_v6[2];
109
110   GstElement *udpqueue[2];
111   GstElement *udpsink[2];
112
113   /* for TCP transport */
114   GstElement *appsrc[2];
115   GstClockTime appsrc_base_time[2];
116   GstElement *appqueue[2];
117   GstElement *appsink[2];
118
119   GstElement *tee[2];
120   GstElement *funnel[2];
121
122   /* retransmission */
123   GstElement *rtxsend;
124   guint rtx_pt;
125   GstClockTime rtx_time;
126
127   /* server ports for sending/receiving over ipv4 */
128   GstRTSPRange server_port_v4;
129   GstRTSPAddress *server_addr_v4;
130   gboolean have_ipv4;
131
132   /* server ports for sending/receiving over ipv6 */
133   GstRTSPRange server_port_v6;
134   GstRTSPAddress *server_addr_v6;
135   gboolean have_ipv6;
136
137   /* multicast addresses */
138   GstRTSPAddressPool *pool;
139   GstRTSPAddress *addr_v4;
140   GstRTSPAddress *addr_v6;
141   gboolean have_ipv4_mcast;
142   gboolean have_ipv6_mcast;
143
144   gchar *multicast_iface;
145
146   /* the caps of the stream */
147   gulong caps_sig;
148   GstCaps *caps;
149
150   /* transports we stream to */
151   guint n_active;
152   GList *transports;
153   guint transports_cookie;
154   GList *tr_cache_rtp;
155   GList *tr_cache_rtcp;
156   guint tr_cache_cookie_rtp;
157   guint tr_cache_cookie_rtcp;
158
159
160   gint dscp_qos;
161
162   /* stream blocking */
163   gulong blocked_id;
164   gboolean blocking;
165
166   /* pt->caps map for RECORD streams */
167   GHashTable *ptmap;
168
169   GstRTSPPublishClockMode publish_clock_mode;
170 };
171
172 #define DEFAULT_CONTROL         NULL
173 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
174 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
175                                         GST_RTSP_LOWER_TRANS_TCP
176
177 enum
178 {
179   PROP_0,
180   PROP_CONTROL,
181   PROP_PROFILES,
182   PROP_PROTOCOLS,
183   PROP_LAST
184 };
185
186 enum
187 {
188   SIGNAL_NEW_RTP_ENCODER,
189   SIGNAL_NEW_RTCP_ENCODER,
190   SIGNAL_LAST
191 };
192
193 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
194 #define GST_CAT_DEFAULT rtsp_stream_debug
195
196 static GQuark ssrc_stream_map_key;
197
198 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
199     GValue * value, GParamSpec * pspec);
200 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
201     const GValue * value, GParamSpec * pspec);
202
203 static void gst_rtsp_stream_finalize (GObject * obj);
204
205 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
206
207 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
208
209 static void
210 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
211 {
212   GObjectClass *gobject_class;
213
214   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
215
216   gobject_class = G_OBJECT_CLASS (klass);
217
218   gobject_class->get_property = gst_rtsp_stream_get_property;
219   gobject_class->set_property = gst_rtsp_stream_set_property;
220   gobject_class->finalize = gst_rtsp_stream_finalize;
221
222   g_object_class_install_property (gobject_class, PROP_CONTROL,
223       g_param_spec_string ("control", "Control",
224           "The control string for this stream", DEFAULT_CONTROL,
225           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   g_object_class_install_property (gobject_class, PROP_PROFILES,
228       g_param_spec_flags ("profiles", "Profiles",
229           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
230           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
233       g_param_spec_flags ("protocols", "Protocols",
234           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
235           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
236
237   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
238       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
239       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
240       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
241
242   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
243       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
245       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
246
247   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
248
249   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
250 }
251
252 static void
253 gst_rtsp_stream_init (GstRTSPStream * stream)
254 {
255   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
256
257   GST_DEBUG ("new stream %p", stream);
258
259   stream->priv = priv;
260
261   priv->dscp_qos = -1;
262   priv->control = g_strdup (DEFAULT_CONTROL);
263   priv->profiles = DEFAULT_PROFILES;
264   priv->protocols = DEFAULT_PROTOCOLS;
265   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
266
267   g_mutex_init (&priv->lock);
268
269   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) gst_caps_unref);
271   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
272       (GDestroyNotify) gst_caps_unref);
273 }
274
275 static void
276 gst_rtsp_stream_finalize (GObject * obj)
277 {
278   GstRTSPStream *stream;
279   GstRTSPStreamPrivate *priv;
280
281   stream = GST_RTSP_STREAM (obj);
282   priv = stream->priv;
283
284   GST_DEBUG ("finalize stream %p", stream);
285
286   /* we really need to be unjoined now */
287   g_return_if_fail (!priv->is_joined);
288
289   if (priv->addr_v4)
290     gst_rtsp_address_free (priv->addr_v4);
291   if (priv->addr_v6)
292     gst_rtsp_address_free (priv->addr_v6);
293   if (priv->server_addr_v4)
294     gst_rtsp_address_free (priv->server_addr_v4);
295   if (priv->server_addr_v6)
296     gst_rtsp_address_free (priv->server_addr_v6);
297   if (priv->pool)
298     g_object_unref (priv->pool);
299   if (priv->rtxsend)
300     g_object_unref (priv->rtxsend);
301
302   g_free (priv->multicast_iface);
303
304   gst_object_unref (priv->payloader);
305   if (priv->srcpad)
306     gst_object_unref (priv->srcpad);
307   if (priv->sinkpad)
308     gst_object_unref (priv->sinkpad);
309   g_free (priv->control);
310   g_mutex_clear (&priv->lock);
311
312   g_hash_table_unref (priv->keys);
313   g_hash_table_destroy (priv->ptmap);
314
315   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
316 }
317
318 static void
319 gst_rtsp_stream_get_property (GObject * object, guint propid,
320     GValue * value, GParamSpec * pspec)
321 {
322   GstRTSPStream *stream = GST_RTSP_STREAM (object);
323
324   switch (propid) {
325     case PROP_CONTROL:
326       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
327       break;
328     case PROP_PROFILES:
329       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
330       break;
331     case PROP_PROTOCOLS:
332       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
333       break;
334     default:
335       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
336   }
337 }
338
339 static void
340 gst_rtsp_stream_set_property (GObject * object, guint propid,
341     const GValue * value, GParamSpec * pspec)
342 {
343   GstRTSPStream *stream = GST_RTSP_STREAM (object);
344
345   switch (propid) {
346     case PROP_CONTROL:
347       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
348       break;
349     case PROP_PROFILES:
350       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
351       break;
352     case PROP_PROTOCOLS:
353       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
354       break;
355     default:
356       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
357   }
358 }
359
360 /**
361  * gst_rtsp_stream_new:
362  * @idx: an index
363  * @pad: a #GstPad
364  * @payloader: a #GstElement
365  *
366  * Create a new media stream with index @idx that handles RTP data on
367  * @pad and has a payloader element @payloader if @pad is a source pad
368  * or a depayloader element @payloader if @pad is a sink pad.
369  *
370  * Returns: (transfer full): a new #GstRTSPStream
371  */
372 GstRTSPStream *
373 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
374 {
375   GstRTSPStreamPrivate *priv;
376   GstRTSPStream *stream;
377
378   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
379   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
380
381   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
382   priv = stream->priv;
383   priv->idx = idx;
384   priv->payloader = gst_object_ref (payloader);
385   if (GST_PAD_IS_SRC (pad))
386     priv->srcpad = gst_object_ref (pad);
387   else
388     priv->sinkpad = gst_object_ref (pad);
389
390   return stream;
391 }
392
393 /**
394  * gst_rtsp_stream_get_index:
395  * @stream: a #GstRTSPStream
396  *
397  * Get the stream index.
398  *
399  * Return: the stream index.
400  */
401 guint
402 gst_rtsp_stream_get_index (GstRTSPStream * stream)
403 {
404   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
405
406   return stream->priv->idx;
407 }
408
409 /**
410  * gst_rtsp_stream_get_pt:
411  * @stream: a #GstRTSPStream
412  *
413  * Get the stream payload type.
414  *
415  * Return: the stream payload type.
416  */
417 guint
418 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
419 {
420   GstRTSPStreamPrivate *priv;
421   guint pt;
422
423   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
424
425   priv = stream->priv;
426
427   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
428
429   return pt;
430 }
431
432 /**
433  * gst_rtsp_stream_get_srcpad:
434  * @stream: a #GstRTSPStream
435  *
436  * Get the srcpad associated with @stream.
437  *
438  * Returns: (transfer full): the srcpad. Unref after usage.
439  */
440 GstPad *
441 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
442 {
443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
444
445   if (!stream->priv->srcpad)
446     return NULL;
447
448   return gst_object_ref (stream->priv->srcpad);
449 }
450
451 /**
452  * gst_rtsp_stream_get_sinkpad:
453  * @stream: a #GstRTSPStream
454  *
455  * Get the sinkpad associated with @stream.
456  *
457  * Returns: (transfer full): the sinkpad. Unref after usage.
458  */
459 GstPad *
460 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
461 {
462   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
463
464   if (!stream->priv->sinkpad)
465     return NULL;
466
467   return gst_object_ref (stream->priv->sinkpad);
468 }
469
470 /**
471  * gst_rtsp_stream_get_control:
472  * @stream: a #GstRTSPStream
473  *
474  * Get the control string to identify this stream.
475  *
476  * Returns: (transfer full): the control string. g_free() after usage.
477  */
478 gchar *
479 gst_rtsp_stream_get_control (GstRTSPStream * stream)
480 {
481   GstRTSPStreamPrivate *priv;
482   gchar *result;
483
484   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
485
486   priv = stream->priv;
487
488   g_mutex_lock (&priv->lock);
489   if ((result = g_strdup (priv->control)) == NULL)
490     result = g_strdup_printf ("stream=%u", priv->idx);
491   g_mutex_unlock (&priv->lock);
492
493   return result;
494 }
495
496 /**
497  * gst_rtsp_stream_set_control:
498  * @stream: a #GstRTSPStream
499  * @control: a control string
500  *
501  * Set the control string in @stream.
502  */
503 void
504 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
505 {
506   GstRTSPStreamPrivate *priv;
507
508   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
509
510   priv = stream->priv;
511
512   g_mutex_lock (&priv->lock);
513   g_free (priv->control);
514   priv->control = g_strdup (control);
515   g_mutex_unlock (&priv->lock);
516 }
517
518 /**
519  * gst_rtsp_stream_has_control:
520  * @stream: a #GstRTSPStream
521  * @control: a control string
522  *
523  * Check if @stream has the control string @control.
524  *
525  * Returns: %TRUE is @stream has @control as the control string
526  */
527 gboolean
528 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
529 {
530   GstRTSPStreamPrivate *priv;
531   gboolean res;
532
533   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
534
535   priv = stream->priv;
536
537   g_mutex_lock (&priv->lock);
538   if (priv->control)
539     res = (g_strcmp0 (priv->control, control) == 0);
540   else {
541     guint streamid;
542
543     if (sscanf (control, "stream=%u", &streamid) > 0)
544       res = (streamid == priv->idx);
545     else
546       res = FALSE;
547   }
548   g_mutex_unlock (&priv->lock);
549
550   return res;
551 }
552
553 /**
554  * gst_rtsp_stream_set_mtu:
555  * @stream: a #GstRTSPStream
556  * @mtu: a new MTU
557  *
558  * Configure the mtu in the payloader of @stream to @mtu.
559  */
560 void
561 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
562 {
563   GstRTSPStreamPrivate *priv;
564
565   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
566
567   priv = stream->priv;
568
569   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
570
571   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
572 }
573
574 /**
575  * gst_rtsp_stream_get_mtu:
576  * @stream: a #GstRTSPStream
577  *
578  * Get the configured MTU in the payloader of @stream.
579  *
580  * Returns: the MTU of the payloader.
581  */
582 guint
583 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
584 {
585   GstRTSPStreamPrivate *priv;
586   guint mtu;
587
588   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
589
590   priv = stream->priv;
591
592   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
593
594   return mtu;
595 }
596
597 /* Update the dscp qos property on the udp sinks */
598 static void
599 update_dscp_qos (GstRTSPStream * stream)
600 {
601   GstRTSPStreamPrivate *priv;
602
603   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
604
605   priv = stream->priv;
606
607   if (priv->udpsink[0]) {
608     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
609         NULL);
610   }
611
612   if (priv->udpsink[1]) {
613     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
614         NULL);
615   }
616 }
617
618 /**
619  * gst_rtsp_stream_set_dscp_qos:
620  * @stream: a #GstRTSPStream
621  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
622  *
623  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
624  */
625 void
626 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
627 {
628   GstRTSPStreamPrivate *priv;
629
630   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
631
632   priv = stream->priv;
633
634   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
635
636   if (dscp_qos < -1 || dscp_qos > 63) {
637     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
638     return;
639   }
640
641   priv->dscp_qos = dscp_qos;
642
643   update_dscp_qos (stream);
644 }
645
646 /**
647  * gst_rtsp_stream_get_dscp_qos:
648  * @stream: a #GstRTSPStream
649  *
650  * Get the configured DSCP QoS in of the outgoing sockets.
651  *
652  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
653  */
654 gint
655 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
656 {
657   GstRTSPStreamPrivate *priv;
658
659   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
660
661   priv = stream->priv;
662
663   return priv->dscp_qos;
664 }
665
666 /**
667  * gst_rtsp_stream_is_transport_supported:
668  * @stream: a #GstRTSPStream
669  * @transport: (transfer none): a #GstRTSPTransport
670  *
671  * Check if @transport can be handled by stream
672  *
673  * Returns: %TRUE if @transport can be handled by @stream.
674  */
675 gboolean
676 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
677     GstRTSPTransport * transport)
678 {
679   GstRTSPStreamPrivate *priv;
680
681   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
682
683   priv = stream->priv;
684
685   g_mutex_lock (&priv->lock);
686   if (transport->trans != GST_RTSP_TRANS_RTP)
687     goto unsupported_transmode;
688
689   if (!(transport->profile & priv->profiles))
690     goto unsupported_profile;
691
692   if (!(transport->lower_transport & priv->protocols))
693     goto unsupported_ltrans;
694
695   g_mutex_unlock (&priv->lock);
696
697   return TRUE;
698
699   /* ERRORS */
700 unsupported_transmode:
701   {
702     GST_DEBUG ("unsupported transport mode %d", transport->trans);
703     g_mutex_unlock (&priv->lock);
704     return FALSE;
705   }
706 unsupported_profile:
707   {
708     GST_DEBUG ("unsupported profile %d", transport->profile);
709     g_mutex_unlock (&priv->lock);
710     return FALSE;
711   }
712 unsupported_ltrans:
713   {
714     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
715     g_mutex_unlock (&priv->lock);
716     return FALSE;
717   }
718 }
719
720 /**
721  * gst_rtsp_stream_set_profiles:
722  * @stream: a #GstRTSPStream
723  * @profiles: the new profiles
724  *
725  * Configure the allowed profiles for @stream.
726  */
727 void
728 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
729 {
730   GstRTSPStreamPrivate *priv;
731
732   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
733
734   priv = stream->priv;
735
736   g_mutex_lock (&priv->lock);
737   priv->profiles = profiles;
738   g_mutex_unlock (&priv->lock);
739 }
740
741 /**
742  * gst_rtsp_stream_get_profiles:
743  * @stream: a #GstRTSPStream
744  *
745  * Get the allowed profiles of @stream.
746  *
747  * Returns: a #GstRTSPProfile
748  */
749 GstRTSPProfile
750 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
751 {
752   GstRTSPStreamPrivate *priv;
753   GstRTSPProfile res;
754
755   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
756
757   priv = stream->priv;
758
759   g_mutex_lock (&priv->lock);
760   res = priv->profiles;
761   g_mutex_unlock (&priv->lock);
762
763   return res;
764 }
765
766 /**
767  * gst_rtsp_stream_set_protocols:
768  * @stream: a #GstRTSPStream
769  * @protocols: the new flags
770  *
771  * Configure the allowed lower transport for @stream.
772  */
773 void
774 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
775     GstRTSPLowerTrans protocols)
776 {
777   GstRTSPStreamPrivate *priv;
778
779   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
780
781   priv = stream->priv;
782
783   g_mutex_lock (&priv->lock);
784   priv->protocols = protocols;
785   g_mutex_unlock (&priv->lock);
786 }
787
788 /**
789  * gst_rtsp_stream_get_protocols:
790  * @stream: a #GstRTSPStream
791  *
792  * Get the allowed protocols of @stream.
793  *
794  * Returns: a #GstRTSPLowerTrans
795  */
796 GstRTSPLowerTrans
797 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
798 {
799   GstRTSPStreamPrivate *priv;
800   GstRTSPLowerTrans res;
801
802   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
803       GST_RTSP_LOWER_TRANS_UNKNOWN);
804
805   priv = stream->priv;
806
807   g_mutex_lock (&priv->lock);
808   res = priv->protocols;
809   g_mutex_unlock (&priv->lock);
810
811   return res;
812 }
813
814 /**
815  * gst_rtsp_stream_set_address_pool:
816  * @stream: a #GstRTSPStream
817  * @pool: (transfer none): a #GstRTSPAddressPool
818  *
819  * configure @pool to be used as the address pool of @stream.
820  */
821 void
822 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
823     GstRTSPAddressPool * pool)
824 {
825   GstRTSPStreamPrivate *priv;
826   GstRTSPAddressPool *old;
827
828   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
829
830   priv = stream->priv;
831
832   GST_LOG_OBJECT (stream, "set address pool %p", pool);
833
834   g_mutex_lock (&priv->lock);
835   if ((old = priv->pool) != pool)
836     priv->pool = pool ? g_object_ref (pool) : NULL;
837   else
838     old = NULL;
839   g_mutex_unlock (&priv->lock);
840
841   if (old)
842     g_object_unref (old);
843 }
844
845 /**
846  * gst_rtsp_stream_get_address_pool:
847  * @stream: a #GstRTSPStream
848  *
849  * Get the #GstRTSPAddressPool used as the address pool of @stream.
850  *
851  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
852  * usage.
853  */
854 GstRTSPAddressPool *
855 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
856 {
857   GstRTSPStreamPrivate *priv;
858   GstRTSPAddressPool *result;
859
860   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
861
862   priv = stream->priv;
863
864   g_mutex_lock (&priv->lock);
865   if ((result = priv->pool))
866     g_object_ref (result);
867   g_mutex_unlock (&priv->lock);
868
869   return result;
870 }
871
872 /**
873  * gst_rtsp_stream_set_multicast_iface:
874  * @stream: a #GstRTSPStream
875  * @multicast_iface: (transfer none): a multicast interface
876  *
877  * configure @multicast_iface to be used for @stream.
878  */
879 void
880 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
881     const gchar * multicast_iface)
882 {
883   GstRTSPStreamPrivate *priv;
884   gchar *old;
885
886   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
887
888   priv = stream->priv;
889
890   GST_LOG_OBJECT (stream, "set multicast iface %s",
891       GST_STR_NULL (multicast_iface));
892
893   g_mutex_lock (&priv->lock);
894   if ((old = priv->multicast_iface) != multicast_iface)
895     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
896   else
897     old = NULL;
898   g_mutex_unlock (&priv->lock);
899
900   if (old)
901     g_free (old);
902 }
903
904 /**
905  * gst_rtsp_stream_get_multicast_iface:
906  * @stream: a #GstRTSPStream
907  *
908  * Get the multicast interface used for @stream.
909  *
910  * Returns: (transfer full): the multicast interface for @stream. g_free() after
911  * usage.
912  */
913 gchar *
914 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
915 {
916   GstRTSPStreamPrivate *priv;
917   gchar *result;
918
919   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
920
921   priv = stream->priv;
922
923   g_mutex_lock (&priv->lock);
924   if ((result = priv->multicast_iface))
925     result = g_strdup (result);
926   g_mutex_unlock (&priv->lock);
927
928   return result;
929 }
930
931 /**
932  * gst_rtsp_stream_get_multicast_address:
933  * @stream: a #GstRTSPStream
934  * @family: the #GSocketFamily
935  *
936  * Get the multicast address of @stream for @family.
937  *
938  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
939  * or %NULL when no address could be allocated. gst_rtsp_address_free()
940  * after usage.
941  */
942 GstRTSPAddress *
943 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
944     GSocketFamily family)
945 {
946   GstRTSPStreamPrivate *priv;
947   GstRTSPAddress *result;
948   GstRTSPAddress **addrp;
949   GstRTSPAddressFlags flags;
950
951   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
952
953   priv = stream->priv;
954
955   if (family == G_SOCKET_FAMILY_IPV6) {
956     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
957     addrp = &priv->addr_v6;
958   } else {
959     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
960     addrp = &priv->addr_v4;
961   }
962
963   g_mutex_lock (&priv->lock);
964   if (*addrp == NULL) {
965     if (priv->pool == NULL)
966       goto no_pool;
967
968     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
969
970     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
971     if (*addrp == NULL)
972       goto no_address;
973   }
974   result = gst_rtsp_address_copy (*addrp);
975   g_mutex_unlock (&priv->lock);
976
977   return result;
978
979   /* ERRORS */
980 no_pool:
981   {
982     GST_ERROR_OBJECT (stream, "no address pool specified");
983     g_mutex_unlock (&priv->lock);
984     return NULL;
985   }
986 no_address:
987   {
988     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
989     g_mutex_unlock (&priv->lock);
990     return NULL;
991   }
992 }
993
994 /**
995  * gst_rtsp_stream_reserve_address:
996  * @stream: a #GstRTSPStream
997  * @address: an address
998  * @port: a port
999  * @n_ports: n_ports
1000  * @ttl: a TTL
1001  *
1002  * Reserve @address and @port as the address and port of @stream.
1003  *
1004  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1005  * the address could be reserved. gst_rtsp_address_free() after usage.
1006  */
1007 GstRTSPAddress *
1008 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1009     const gchar * address, guint port, guint n_ports, guint ttl)
1010 {
1011   GstRTSPStreamPrivate *priv;
1012   GstRTSPAddress *result;
1013   GInetAddress *addr;
1014   GSocketFamily family;
1015   GstRTSPAddress **addrp;
1016
1017   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1018   g_return_val_if_fail (address != NULL, NULL);
1019   g_return_val_if_fail (port > 0, NULL);
1020   g_return_val_if_fail (n_ports > 0, NULL);
1021   g_return_val_if_fail (ttl > 0, NULL);
1022
1023   priv = stream->priv;
1024
1025   addr = g_inet_address_new_from_string (address);
1026   if (!addr) {
1027     GST_ERROR ("failed to get inet addr from %s", address);
1028     family = G_SOCKET_FAMILY_IPV4;
1029   } else {
1030     family = g_inet_address_get_family (addr);
1031     g_object_unref (addr);
1032   }
1033
1034   if (family == G_SOCKET_FAMILY_IPV6)
1035     addrp = &priv->addr_v6;
1036   else
1037     addrp = &priv->addr_v4;
1038
1039   g_mutex_lock (&priv->lock);
1040   if (*addrp == NULL) {
1041     GstRTSPAddressPoolResult res;
1042
1043     if (priv->pool == NULL)
1044       goto no_pool;
1045
1046     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1047         port, n_ports, ttl, addrp);
1048     if (res != GST_RTSP_ADDRESS_POOL_OK)
1049       goto no_address;
1050   } else {
1051     if (strcmp ((*addrp)->address, address) ||
1052         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1053         (*addrp)->ttl != ttl)
1054       goto different_address;
1055   }
1056   result = gst_rtsp_address_copy (*addrp);
1057   g_mutex_unlock (&priv->lock);
1058
1059   return result;
1060
1061   /* ERRORS */
1062 no_pool:
1063   {
1064     GST_ERROR_OBJECT (stream, "no address pool specified");
1065     g_mutex_unlock (&priv->lock);
1066     return NULL;
1067   }
1068 no_address:
1069   {
1070     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1071         address);
1072     g_mutex_unlock (&priv->lock);
1073     return NULL;
1074   }
1075 different_address:
1076   {
1077     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1078         " reserved", address);
1079     g_mutex_unlock (&priv->lock);
1080     return NULL;
1081   }
1082 }
1083
1084 /* must be called with lock */
1085 static void
1086 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1087     GSocket * rtcp_socket, GSocketFamily family)
1088 {
1089   GstRTSPStreamPrivate *priv = stream->priv;
1090   const gchar *multisink_socket;
1091
1092   if (family == G_SOCKET_FAMILY_IPV6)
1093     multisink_socket = "socket-v6";
1094   else
1095     multisink_socket = "socket";
1096
1097   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1098       NULL);
1099   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1100       NULL);
1101 }
1102
1103 /* must be called with lock */
1104 static gboolean
1105 create_and_configure_udpsinks (GstRTSPStream * stream)
1106 {
1107   GstRTSPStreamPrivate *priv = stream->priv;
1108   GstElement *udpsink0, *udpsink1;
1109
1110   udpsink0 = NULL;
1111   udpsink1 = NULL;
1112
1113   if (priv->udpsink[0])
1114     udpsink0 = priv->udpsink[0];
1115   else
1116     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1117
1118   if (!udpsink0)
1119     goto no_udp_protocol;
1120
1121   if (priv->udpsink[1])
1122     udpsink1 = priv->udpsink[1];
1123   else
1124     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1125
1126   if (!udpsink1)
1127     goto no_udp_protocol;
1128
1129   /* configure sinks */
1130
1131   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1132   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1133
1134   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1135   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1136
1137   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1138
1139   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1140   /* Needs to be async for RECORD streams, otherwise we will never go to
1141    * PLAYING because the sinks will wait for data while the udpsrc can't
1142    * provide data with timestamps in PAUSED. */
1143   if (priv->sinkpad)
1144     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1145   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1146
1147   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1148   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1149
1150   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1151   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1152
1153   /* update the dscp qos field in the sinks */
1154   update_dscp_qos (stream);
1155
1156   priv->udpsink[0] = udpsink0;
1157   priv->udpsink[1] = udpsink1;
1158
1159   return TRUE;
1160
1161   /* ERRORS */
1162 no_udp_protocol:
1163   {
1164     return FALSE;
1165   }
1166 }
1167
1168 /* must be called with lock */
1169 static void
1170 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1171     GSocketFamily family)
1172 {
1173   GstRTSPStreamPrivate *priv;
1174   GstPad *pad, *selpad;
1175   guint i;
1176   GstBin *bin;
1177
1178   priv = stream->priv;
1179   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1180
1181   for (i = 0; i < 2; i++) {
1182     if (priv->sinkpad || i == 1) {
1183       if (priv->srcpad) {
1184         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1185          * values. This is only relevant for PLAY pipelines */
1186         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1187         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1188       }
1189       /* add udpsrc */
1190       gst_bin_add (bin, udpsrc_out[i]);
1191
1192       /* and link to the funnel */
1193       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1194       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1195       gst_pad_link (pad, selpad);
1196       gst_object_unref (pad);
1197       gst_object_unref (selpad);
1198
1199       /* otherwise sync state with parent in case it's running already
1200        * at this point */
1201       if (!priv->srcpad) {
1202         gst_element_sync_state_with_parent (udpsrc_out[i]);
1203       }
1204     }
1205   }
1206
1207   gst_object_unref (bin);
1208 }
1209
1210 /* must be called with lock */
1211 static gboolean
1212 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1213     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1214     const gchar * address, gint rtpport, gint rtcpport,
1215     const gchar * multicast_iface, GstRTSPLowerTrans transport)
1216 {
1217   GstStateChangeReturn ret;
1218
1219   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1220   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1221
1222   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1223     goto error;
1224
1225   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1226     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1227     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1228     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1229     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1230     g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface,
1231         NULL);
1232     g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface,
1233         NULL);
1234     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1235     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1236   }
1237
1238   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1239   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1240
1241   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1242   if (ret == GST_STATE_CHANGE_FAILURE)
1243     goto error;
1244   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1245   if (ret == GST_STATE_CHANGE_FAILURE)
1246     goto error;
1247
1248   return TRUE;
1249   return TRUE;
1250
1251   /* ERRORS */
1252 error:
1253   {
1254     if (udpsrc_out[0])
1255       gst_object_unref (udpsrc_out[0]);
1256     if (udpsrc_out[1])
1257       gst_object_unref (udpsrc_out[1]);
1258     return FALSE;
1259   }
1260 }
1261
1262 static gboolean
1263 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1264     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1265     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1266     gboolean use_client_settings)
1267 {
1268   GstRTSPStreamPrivate *priv = stream->priv;
1269   GSocket *rtp_socket = NULL;
1270   GSocket *rtcp_socket;
1271   gint tmp_rtp, tmp_rtcp;
1272   guint count;
1273   gint rtpport, rtcpport;
1274   GList *rejected_addresses = NULL;
1275   GstRTSPAddress *addr = NULL;
1276   GInetAddress *inetaddr = NULL;
1277   gchar *addr_str;
1278   GSocketAddress *rtp_sockaddr = NULL;
1279   GSocketAddress *rtcp_sockaddr = NULL;
1280   GstRTSPAddressPool *pool;
1281   GstRTSPLowerTrans transport;
1282   const gchar *multicast_iface = priv->multicast_iface;
1283
1284   pool = priv->pool;
1285   count = 0;
1286   transport = ct->lower_transport;
1287
1288   /* Start with random port */
1289   tmp_rtp = 0;
1290
1291   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1292       G_SOCKET_PROTOCOL_UDP, NULL);
1293   if (!rtcp_socket)
1294     goto no_udp_protocol;
1295   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1296
1297   if (*server_addr_out)
1298     gst_rtsp_address_free (*server_addr_out);
1299
1300   /* try to allocate 2 UDP ports, the RTP port should be an even
1301    * number and the RTCP port should be the next (uneven) port */
1302 again:
1303
1304   if (rtp_socket == NULL) {
1305     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1306         G_SOCKET_PROTOCOL_UDP, NULL);
1307     if (!rtp_socket)
1308       goto no_udp_protocol;
1309     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1310   }
1311
1312   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1313               gst_rtsp_address_pool_has_unicast_addresses (pool))
1314           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1315     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1316
1317     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1318       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1319     else
1320       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1321
1322     if (addr)
1323       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1324
1325     if (family == G_SOCKET_FAMILY_IPV6)
1326       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1327     else
1328       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1329
1330     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1331         && use_client_settings)
1332       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1333           ct->port.min, 2, ct->ttl, &addr);
1334     else
1335       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1336
1337     if (addr == NULL)
1338       goto no_ports;
1339
1340     tmp_rtp = addr->port;
1341
1342     g_clear_object (&inetaddr);
1343     inetaddr = g_inet_address_new_from_string (addr->address);
1344
1345     /* If we're supposed to bind to a multicast address, instead bind
1346      * to ANY and let udpsrc later join the relevant multicast group
1347      */
1348     if (g_inet_address_get_is_multicast (inetaddr)) {
1349       g_object_unref (inetaddr);
1350       inetaddr = g_inet_address_new_any (family);
1351     }
1352   } else {
1353     if (tmp_rtp != 0) {
1354       tmp_rtp += 2;
1355       if (++count > 20)
1356         goto no_ports;
1357     }
1358
1359     if (inetaddr == NULL)
1360       inetaddr = g_inet_address_new_any (family);
1361   }
1362
1363   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1364   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1365     g_object_unref (rtp_sockaddr);
1366     goto again;
1367   }
1368   g_object_unref (rtp_sockaddr);
1369
1370   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1371   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1372     g_clear_object (&rtp_sockaddr);
1373     goto socket_error;
1374   }
1375
1376   tmp_rtp =
1377       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1378   g_object_unref (rtp_sockaddr);
1379
1380   /* check if port is even */
1381   if ((tmp_rtp & 1) != 0) {
1382     /* port not even, close and allocate another */
1383     tmp_rtp++;
1384     g_clear_object (&rtp_socket);
1385     goto again;
1386   }
1387
1388   /* set port */
1389   tmp_rtcp = tmp_rtp + 1;
1390
1391   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1392   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1393     g_object_unref (rtcp_sockaddr);
1394     g_clear_object (&rtp_socket);
1395     goto again;
1396   }
1397   g_object_unref (rtcp_sockaddr);
1398
1399   if (addr == NULL)
1400     addr_str = g_inet_address_to_string (inetaddr);
1401   else
1402     addr_str = addr->address;
1403   g_clear_object (&inetaddr);
1404
1405   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1406           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface,
1407           transport)) {
1408     if (addr == NULL)
1409       g_free (addr_str);
1410     goto no_udp_protocol;
1411   }
1412
1413   if (addr == NULL)
1414     g_free (addr_str);
1415
1416   play_udpsources_one_family (stream, udpsrc_out, family);
1417
1418   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1419   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1420
1421   /* this should not happen... */
1422   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1423     goto port_error;
1424
1425   /* set RTP and RTCP sockets */
1426   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1427
1428   server_port_out->min = rtpport;
1429   server_port_out->max = rtcpport;
1430
1431   *server_addr_out = addr;
1432   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1433
1434   g_object_unref (rtp_socket);
1435   g_object_unref (rtcp_socket);
1436
1437   return TRUE;
1438
1439   /* ERRORS */
1440 no_udp_protocol:
1441   {
1442     goto cleanup;
1443   }
1444 no_ports:
1445   {
1446     goto cleanup;
1447   }
1448 port_error:
1449   {
1450     goto cleanup;
1451   }
1452 socket_error:
1453   {
1454     goto cleanup;
1455   }
1456 cleanup:
1457   {
1458     if (inetaddr)
1459       g_object_unref (inetaddr);
1460     g_list_free_full (rejected_addresses,
1461         (GDestroyNotify) gst_rtsp_address_free);
1462     if (addr)
1463       gst_rtsp_address_free (addr);
1464     if (rtp_socket)
1465       g_object_unref (rtp_socket);
1466     if (rtcp_socket)
1467       g_object_unref (rtcp_socket);
1468     return FALSE;
1469   }
1470 }
1471
1472 /**
1473  * gst_rtsp_stream_allocate_udp_sockets:
1474  * @stream: a #GstRTSPStream
1475  * @family: protocol family
1476  * @transport_method: transport method
1477  *
1478  * Allocates RTP and RTCP ports.
1479  *
1480  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1481  */
1482 gboolean
1483 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1484     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1485 {
1486   GstRTSPStreamPrivate *priv;
1487   gboolean result = FALSE;
1488   GstRTSPLowerTrans transport = ct->lower_transport;
1489
1490   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1491   priv = stream->priv;
1492   g_return_val_if_fail (priv->is_joined, FALSE);
1493
1494   g_mutex_lock (&priv->lock);
1495
1496   if (family == G_SOCKET_FAMILY_IPV4) {
1497     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1498       if (priv->have_ipv4_mcast)
1499         goto done;
1500       priv->have_ipv4_mcast =
1501           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1502           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
1503           use_client_settings);
1504     } else {
1505       priv->have_ipv4 =
1506           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1507           &priv->server_port_v4, ct, &priv->server_addr_v4,
1508           use_client_settings);
1509     }
1510   } else {
1511     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1512       if (priv->have_ipv6_mcast)
1513         goto done;
1514       priv->have_ipv6_mcast =
1515           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1516           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
1517           use_client_settings);
1518     } else {
1519       if (priv->have_ipv6)
1520         goto done;
1521       priv->have_ipv6 =
1522           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1523           &priv->server_port_v6, ct, &priv->server_addr_v6,
1524           use_client_settings);
1525     }
1526   }
1527
1528 done:
1529   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1530       priv->have_ipv6_mcast;
1531
1532   g_mutex_unlock (&priv->lock);
1533
1534   return result;
1535 }
1536
1537 /**
1538  * gst_rtsp_stream_set_client_side:
1539  * @stream: a #GstRTSPStream
1540  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1541  * an RTSP connection.
1542  *
1543  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1544  * streams to an RTSP server via RECORD. This has the practical effect
1545  * of changing which UDP port numbers are used when setting up the local
1546  * side of the stream sending to be either the 'server' or 'client' pair
1547  * of a configured UDP transport.
1548  */
1549 void
1550 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1551 {
1552   GstRTSPStreamPrivate *priv;
1553
1554   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1555   priv = stream->priv;
1556   g_mutex_lock (&priv->lock);
1557   priv->client_side = client_side;
1558   g_mutex_unlock (&priv->lock);
1559 }
1560
1561 /**
1562  * gst_rtsp_stream_is_client_side:
1563  * @stream: a #GstRTSPStream
1564  *
1565  * See gst_rtsp_stream_set_client_side()
1566  *
1567  * Returns: TRUE if this #GstRTSPStream is client-side.
1568  */
1569 gboolean
1570 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1571 {
1572   GstRTSPStreamPrivate *priv;
1573   gboolean ret;
1574
1575   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1576
1577   priv = stream->priv;
1578   g_mutex_lock (&priv->lock);
1579   ret = priv->client_side;
1580   g_mutex_unlock (&priv->lock);
1581
1582   return ret;
1583 }
1584
1585 /**
1586  * gst_rtsp_stream_get_server_port:
1587  * @stream: a #GstRTSPStream
1588  * @server_port: (out): result server port
1589  * @family: the port family to get
1590  *
1591  * Fill @server_port with the port pair used by the server. This function can
1592  * only be called when @stream has been joined.
1593  */
1594 void
1595 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1596     GstRTSPRange * server_port, GSocketFamily family)
1597 {
1598   GstRTSPStreamPrivate *priv;
1599
1600   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1601   priv = stream->priv;
1602   g_return_if_fail (priv->is_joined);
1603
1604   g_mutex_lock (&priv->lock);
1605   if (family == G_SOCKET_FAMILY_IPV4) {
1606     if (server_port)
1607       *server_port = priv->server_port_v4;
1608   } else {
1609     if (server_port)
1610       *server_port = priv->server_port_v6;
1611   }
1612   g_mutex_unlock (&priv->lock);
1613 }
1614
1615 /**
1616  * gst_rtsp_stream_get_rtpsession:
1617  * @stream: a #GstRTSPStream
1618  *
1619  * Get the RTP session of this stream.
1620  *
1621  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1622  */
1623 GObject *
1624 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1625 {
1626   GstRTSPStreamPrivate *priv;
1627   GObject *session;
1628
1629   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1630
1631   priv = stream->priv;
1632
1633   g_mutex_lock (&priv->lock);
1634   if ((session = priv->session))
1635     g_object_ref (session);
1636   g_mutex_unlock (&priv->lock);
1637
1638   return session;
1639 }
1640
1641 /**
1642  * gst_rtsp_stream_get_ssrc:
1643  * @stream: a #GstRTSPStream
1644  * @ssrc: (out): result ssrc
1645  *
1646  * Get the SSRC used by the RTP session of this stream. This function can only
1647  * be called when @stream has been joined.
1648  */
1649 void
1650 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1651 {
1652   GstRTSPStreamPrivate *priv;
1653
1654   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1655   priv = stream->priv;
1656   g_return_if_fail (priv->is_joined);
1657
1658   g_mutex_lock (&priv->lock);
1659   if (ssrc && priv->session)
1660     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1661   g_mutex_unlock (&priv->lock);
1662 }
1663
1664 /**
1665  * gst_rtsp_stream_set_retransmission_time:
1666  * @stream: a #GstRTSPStream
1667  * @time: a #GstClockTime
1668  *
1669  * Set the amount of time to store retransmission packets.
1670  */
1671 void
1672 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1673     GstClockTime time)
1674 {
1675   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1676
1677   g_mutex_lock (&stream->priv->lock);
1678   stream->priv->rtx_time = time;
1679   if (stream->priv->rtxsend)
1680     g_object_set (stream->priv->rtxsend, "max-size-time",
1681         GST_TIME_AS_MSECONDS (time), NULL);
1682   g_mutex_unlock (&stream->priv->lock);
1683 }
1684
1685 /**
1686  * gst_rtsp_stream_get_retransmission_time:
1687  * @stream: a #GstRTSPStream
1688  *
1689  * Get the amount of time to store retransmission data.
1690  *
1691  * Returns: the amount of time to store retransmission data.
1692  */
1693 GstClockTime
1694 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1695 {
1696   GstClockTime ret;
1697
1698   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1699
1700   g_mutex_lock (&stream->priv->lock);
1701   ret = stream->priv->rtx_time;
1702   g_mutex_unlock (&stream->priv->lock);
1703
1704   return ret;
1705 }
1706
1707 /**
1708  * gst_rtsp_stream_set_retransmission_pt:
1709  * @stream: a #GstRTSPStream
1710  * @rtx_pt: a #guint
1711  *
1712  * Set the payload type (pt) for retransmission of this stream.
1713  */
1714 void
1715 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1716 {
1717   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1718
1719   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1720
1721   g_mutex_lock (&stream->priv->lock);
1722   stream->priv->rtx_pt = rtx_pt;
1723   if (stream->priv->rtxsend) {
1724     guint pt = gst_rtsp_stream_get_pt (stream);
1725     gchar *pt_s = g_strdup_printf ("%d", pt);
1726     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1727         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1728     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1729     g_free (pt_s);
1730     gst_structure_free (rtx_pt_map);
1731   }
1732   g_mutex_unlock (&stream->priv->lock);
1733 }
1734
1735 /**
1736  * gst_rtsp_stream_get_retransmission_pt:
1737  * @stream: a #GstRTSPStream
1738  *
1739  * Get the payload-type used for retransmission of this stream
1740  *
1741  * Returns: The retransmission PT.
1742  */
1743 guint
1744 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1745 {
1746   guint rtx_pt;
1747
1748   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1749
1750   g_mutex_lock (&stream->priv->lock);
1751   rtx_pt = stream->priv->rtx_pt;
1752   g_mutex_unlock (&stream->priv->lock);
1753
1754   return rtx_pt;
1755 }
1756
1757 /**
1758  * gst_rtsp_stream_set_buffer_size:
1759  * @stream: a #GstRTSPStream
1760  * @size: the buffer size
1761  *
1762  * Set the size of the UDP transmission buffer (in bytes)
1763  * Needs to be set before the stream is joined to a bin.
1764  *
1765  * Since: 1.6
1766  */
1767 void
1768 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1769 {
1770   g_mutex_lock (&stream->priv->lock);
1771   stream->priv->buffer_size = size;
1772   g_mutex_unlock (&stream->priv->lock);
1773 }
1774
1775 /**
1776  * gst_rtsp_stream_get_buffer_size:
1777  * @stream: a #GstRTSPStream
1778  *
1779  * Get the size of the UDP transmission buffer (in bytes)
1780  *
1781  * Returns: the size of the UDP TX buffer
1782  *
1783  * Since: 1.6
1784  */
1785 guint
1786 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1787 {
1788   guint buffer_size;
1789
1790   g_mutex_lock (&stream->priv->lock);
1791   buffer_size = stream->priv->buffer_size;
1792   g_mutex_unlock (&stream->priv->lock);
1793
1794   return buffer_size;
1795 }
1796
1797 /* executed from streaming thread */
1798 static void
1799 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1800 {
1801   GstRTSPStreamPrivate *priv = stream->priv;
1802   GstCaps *newcaps, *oldcaps;
1803
1804   newcaps = gst_pad_get_current_caps (pad);
1805
1806   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1807       newcaps);
1808
1809   g_mutex_lock (&priv->lock);
1810   oldcaps = priv->caps;
1811   priv->caps = newcaps;
1812   g_mutex_unlock (&priv->lock);
1813
1814   if (oldcaps)
1815     gst_caps_unref (oldcaps);
1816 }
1817
1818 static void
1819 dump_structure (const GstStructure * s)
1820 {
1821   gchar *sstr;
1822
1823   sstr = gst_structure_to_string (s);
1824   GST_INFO ("structure: %s", sstr);
1825   g_free (sstr);
1826 }
1827
1828 static GstRTSPStreamTransport *
1829 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1830 {
1831   GstRTSPStreamPrivate *priv = stream->priv;
1832   GList *walk;
1833   GstRTSPStreamTransport *result = NULL;
1834   const gchar *tmp;
1835   gchar *dest;
1836   guint port;
1837
1838   if (rtcp_from == NULL)
1839     return NULL;
1840
1841   tmp = g_strrstr (rtcp_from, ":");
1842   if (tmp == NULL)
1843     return NULL;
1844
1845   port = atoi (tmp + 1);
1846   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1847
1848   g_mutex_lock (&priv->lock);
1849   GST_INFO ("finding %s:%d in %d transports", dest, port,
1850       g_list_length (priv->transports));
1851
1852   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1853     GstRTSPStreamTransport *trans = walk->data;
1854     const GstRTSPTransport *tr;
1855     gint min, max;
1856
1857     tr = gst_rtsp_stream_transport_get_transport (trans);
1858
1859     if (priv->client_side) {
1860       /* In client side mode the 'destination' is the RTSP server, so send
1861        * to those ports */
1862       min = tr->server_port.min;
1863       max = tr->server_port.max;
1864     } else {
1865       min = tr->client_port.min;
1866       max = tr->client_port.max;
1867     }
1868
1869     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1870       result = trans;
1871       break;
1872     }
1873   }
1874   if (result)
1875     g_object_ref (result);
1876   g_mutex_unlock (&priv->lock);
1877
1878   g_free (dest);
1879
1880   return result;
1881 }
1882
1883 static GstRTSPStreamTransport *
1884 check_transport (GObject * source, GstRTSPStream * stream)
1885 {
1886   GstStructure *stats;
1887   GstRTSPStreamTransport *trans;
1888
1889   /* see if we have a stream to match with the origin of the RTCP packet */
1890   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1891   if (trans == NULL) {
1892     g_object_get (source, "stats", &stats, NULL);
1893     if (stats) {
1894       const gchar *rtcp_from;
1895
1896       dump_structure (stats);
1897
1898       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1899       if ((trans = find_transport (stream, rtcp_from))) {
1900         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1901             source);
1902         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1903             g_object_unref);
1904       }
1905       gst_structure_free (stats);
1906     }
1907   }
1908   return trans;
1909 }
1910
1911
1912 static void
1913 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1914 {
1915   GstRTSPStreamTransport *trans;
1916
1917   GST_INFO ("%p: new source %p", stream, source);
1918
1919   trans = check_transport (source, stream);
1920
1921   if (trans)
1922     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1923 }
1924
1925 static void
1926 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1927 {
1928   GST_INFO ("%p: new SDES %p", stream, source);
1929 }
1930
1931 static void
1932 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1933 {
1934   GstRTSPStreamTransport *trans;
1935
1936   trans = check_transport (source, stream);
1937
1938   if (trans) {
1939     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1940     gst_rtsp_stream_transport_keep_alive (trans);
1941   }
1942 #ifdef DUMP_STATS
1943   {
1944     GstStructure *stats;
1945     g_object_get (source, "stats", &stats, NULL);
1946     if (stats) {
1947       dump_structure (stats);
1948       gst_structure_free (stats);
1949     }
1950   }
1951 #endif
1952 }
1953
1954 static void
1955 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1956 {
1957   GST_INFO ("%p: source %p bye", stream, source);
1958 }
1959
1960 static void
1961 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1962 {
1963   GstRTSPStreamTransport *trans;
1964
1965   GST_INFO ("%p: source %p bye timeout", stream, source);
1966
1967   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1968     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1969     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1970   }
1971 }
1972
1973 static void
1974 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1975 {
1976   GstRTSPStreamTransport *trans;
1977
1978   GST_INFO ("%p: source %p timeout", stream, source);
1979
1980   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1981     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1982     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1983   }
1984 }
1985
1986 static void
1987 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1988 {
1989   GST_INFO ("%p: new sender source %p", stream, source);
1990 #ifndef DUMP_STATS
1991   {
1992     GstStructure *stats;
1993     g_object_get (source, "stats", &stats, NULL);
1994     if (stats) {
1995       dump_structure (stats);
1996       gst_structure_free (stats);
1997     }
1998   }
1999 #endif
2000 }
2001
2002 static void
2003 on_sender_ssrc_active (GObject * session, GObject * source,
2004     GstRTSPStream * stream)
2005 {
2006 #ifndef DUMP_STATS
2007   {
2008     GstStructure *stats;
2009     g_object_get (source, "stats", &stats, NULL);
2010     if (stats) {
2011       dump_structure (stats);
2012       gst_structure_free (stats);
2013     }
2014   }
2015 #endif
2016 }
2017
2018 static void
2019 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2020 {
2021   if (is_rtp) {
2022     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2023     g_list_free (priv->tr_cache_rtp);
2024     priv->tr_cache_rtp = NULL;
2025   } else {
2026     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2027     g_list_free (priv->tr_cache_rtcp);
2028     priv->tr_cache_rtcp = NULL;
2029   }
2030 }
2031
2032 static GstFlowReturn
2033 handle_new_sample (GstAppSink * sink, gpointer user_data)
2034 {
2035   GstRTSPStreamPrivate *priv;
2036   GList *walk;
2037   GstSample *sample;
2038   GstBuffer *buffer;
2039   GstRTSPStream *stream;
2040   gboolean is_rtp;
2041
2042   sample = gst_app_sink_pull_sample (sink);
2043   if (!sample)
2044     return GST_FLOW_OK;
2045
2046   stream = (GstRTSPStream *) user_data;
2047   priv = stream->priv;
2048   buffer = gst_sample_get_buffer (sample);
2049
2050   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2051
2052   g_mutex_lock (&priv->lock);
2053   if (is_rtp) {
2054     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2055       clear_tr_cache (priv, is_rtp);
2056       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2057         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2058         priv->tr_cache_rtp =
2059             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2060       }
2061       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2062     }
2063   } else {
2064     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2065       clear_tr_cache (priv, is_rtp);
2066       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2067         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2068         priv->tr_cache_rtcp =
2069             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2070       }
2071       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2072     }
2073   }
2074   g_mutex_unlock (&priv->lock);
2075
2076   if (is_rtp) {
2077     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2078       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2079       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2080     }
2081   } else {
2082     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2083       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2084       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2085     }
2086   }
2087   gst_sample_unref (sample);
2088
2089   return GST_FLOW_OK;
2090 }
2091
2092 static GstAppSinkCallbacks sink_cb = {
2093   NULL,                         /* not interested in EOS */
2094   NULL,                         /* not interested in preroll samples */
2095   handle_new_sample,
2096 };
2097
2098 static GstElement *
2099 get_rtp_encoder (GstRTSPStream * stream, guint session)
2100 {
2101   GstRTSPStreamPrivate *priv = stream->priv;
2102
2103   if (priv->srtpenc == NULL) {
2104     gchar *name;
2105
2106     name = g_strdup_printf ("srtpenc_%u", session);
2107     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2108     g_free (name);
2109
2110     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2111   }
2112   return gst_object_ref (priv->srtpenc);
2113 }
2114
2115 static GstElement *
2116 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2117 {
2118   GstRTSPStreamPrivate *priv = stream->priv;
2119   GstElement *oldenc, *enc;
2120   GstPad *pad;
2121   gchar *name;
2122
2123   if (priv->idx != session)
2124     return NULL;
2125
2126   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2127
2128   oldenc = priv->srtpenc;
2129   enc = get_rtp_encoder (stream, session);
2130   name = g_strdup_printf ("rtp_sink_%d", session);
2131   pad = gst_element_get_request_pad (enc, name);
2132   g_free (name);
2133   gst_object_unref (pad);
2134
2135   if (oldenc == NULL)
2136     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2137         enc);
2138
2139   return enc;
2140 }
2141
2142 static GstElement *
2143 request_rtcp_encoder (GstElement * rtpbin, guint session,
2144     GstRTSPStream * stream)
2145 {
2146   GstRTSPStreamPrivate *priv = stream->priv;
2147   GstElement *oldenc, *enc;
2148   GstPad *pad;
2149   gchar *name;
2150
2151   if (priv->idx != session)
2152     return NULL;
2153
2154   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2155
2156   oldenc = priv->srtpenc;
2157   enc = get_rtp_encoder (stream, session);
2158   name = g_strdup_printf ("rtcp_sink_%d", session);
2159   pad = gst_element_get_request_pad (enc, name);
2160   g_free (name);
2161   gst_object_unref (pad);
2162
2163   if (oldenc == NULL)
2164     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2165         enc);
2166
2167   return enc;
2168 }
2169
2170 static GstCaps *
2171 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2172 {
2173   GstRTSPStreamPrivate *priv = stream->priv;
2174   GstCaps *caps;
2175
2176   GST_DEBUG ("request key %08x", ssrc);
2177
2178   g_mutex_lock (&priv->lock);
2179   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2180     gst_caps_ref (caps);
2181   g_mutex_unlock (&priv->lock);
2182
2183   return caps;
2184 }
2185
2186 static GstElement *
2187 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2188     GstRTSPStream * stream)
2189 {
2190   GstRTSPStreamPrivate *priv = stream->priv;
2191
2192   if (priv->idx != session)
2193     return NULL;
2194
2195   if (priv->srtpdec == NULL) {
2196     gchar *name;
2197
2198     name = g_strdup_printf ("srtpdec_%u", session);
2199     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2200     g_free (name);
2201
2202     g_signal_connect (priv->srtpdec, "request-key",
2203         (GCallback) request_key, stream);
2204   }
2205   return gst_object_ref (priv->srtpdec);
2206 }
2207
2208 /**
2209  * gst_rtsp_stream_request_aux_sender:
2210  * @stream: a #GstRTSPStream
2211  * @sessid: the session id
2212  *
2213  * Creating a rtxsend bin
2214  *
2215  * Returns: (transfer full): a #GstElement.
2216  *
2217  * Since: 1.6
2218  */
2219 GstElement *
2220 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2221 {
2222   GstElement *bin;
2223   GstPad *pad;
2224   GstStructure *pt_map;
2225   gchar *name;
2226   guint pt, rtx_pt;
2227   gchar *pt_s;
2228
2229   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2230
2231   pt = gst_rtsp_stream_get_pt (stream);
2232   pt_s = g_strdup_printf ("%u", pt);
2233   rtx_pt = stream->priv->rtx_pt;
2234
2235   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2236
2237   bin = gst_bin_new (NULL);
2238   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2239   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2240       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2241   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2242       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2243   g_free (pt_s);
2244   gst_structure_free (pt_map);
2245   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2246
2247   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2248   name = g_strdup_printf ("src_%u", sessid);
2249   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2250   g_free (name);
2251   gst_object_unref (pad);
2252
2253   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2254   name = g_strdup_printf ("sink_%u", sessid);
2255   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2256   g_free (name);
2257   gst_object_unref (pad);
2258
2259   return bin;
2260 }
2261
2262 /**
2263  * gst_rtsp_stream_set_pt_map:
2264  * @stream: a #GstRTSPStream
2265  * @pt: the pt
2266  * @caps: a #GstCaps
2267  *
2268  * Configure a pt map between @pt and @caps.
2269  */
2270 void
2271 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2272 {
2273   GstRTSPStreamPrivate *priv = stream->priv;
2274
2275   g_mutex_lock (&priv->lock);
2276   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2277   g_mutex_unlock (&priv->lock);
2278 }
2279
2280 /**
2281  * gst_rtsp_stream_set_publish_clock_mode:
2282  * @stream: a #GstRTSPStream
2283  * @mode: the clock publish mode
2284  *
2285  * Sets if and how the stream clock should be published according to RFC7273.
2286  *
2287  * Since: 1.8
2288  */
2289 void
2290 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2291     GstRTSPPublishClockMode mode)
2292 {
2293   GstRTSPStreamPrivate *priv;
2294
2295   priv = stream->priv;
2296   g_mutex_lock (&priv->lock);
2297   priv->publish_clock_mode = mode;
2298   g_mutex_unlock (&priv->lock);
2299 }
2300
2301 /**
2302  * gst_rtsp_stream_get_publish_clock_mode:
2303  * @factory: a #GstRTSPStream
2304  *
2305  * Gets if and how the stream clock should be published according to RFC7273.
2306  *
2307  * Returns: The GstRTSPPublishClockMode
2308  *
2309  * Since: 1.8
2310  */
2311 GstRTSPPublishClockMode
2312 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2313 {
2314   GstRTSPStreamPrivate *priv;
2315   GstRTSPPublishClockMode ret;
2316
2317   priv = stream->priv;
2318   g_mutex_lock (&priv->lock);
2319   ret = priv->publish_clock_mode;
2320   g_mutex_unlock (&priv->lock);
2321
2322   return ret;
2323 }
2324
2325 static GstCaps *
2326 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2327     GstRTSPStream * stream)
2328 {
2329   GstRTSPStreamPrivate *priv = stream->priv;
2330   GstCaps *caps = NULL;
2331
2332   g_mutex_lock (&priv->lock);
2333
2334   if (priv->idx == session) {
2335     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2336     if (caps) {
2337       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2338       gst_caps_ref (caps);
2339     } else {
2340       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2341     }
2342   }
2343
2344   g_mutex_unlock (&priv->lock);
2345
2346   return caps;
2347 }
2348
2349 static void
2350 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2351 {
2352   GstRTSPStreamPrivate *priv = stream->priv;
2353   gchar *name;
2354   GstPadLinkReturn ret;
2355   guint sessid;
2356
2357   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2358       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2359
2360   name = gst_pad_get_name (pad);
2361   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2362     g_free (name);
2363     return;
2364   }
2365   g_free (name);
2366
2367   if (priv->idx != sessid)
2368     return;
2369
2370   if (gst_pad_is_linked (priv->sinkpad)) {
2371     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2372         GST_DEBUG_PAD_NAME (priv->sinkpad));
2373     return;
2374   }
2375
2376   /* link the RTP pad to the session manager, it should not really fail unless
2377    * this is not really an RTP pad */
2378   ret = gst_pad_link (pad, priv->sinkpad);
2379   if (ret != GST_PAD_LINK_OK)
2380     goto link_failed;
2381   priv->recv_rtp_src = gst_object_ref (pad);
2382
2383   return;
2384
2385 /* ERRORS */
2386 link_failed:
2387   {
2388     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2389         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2390   }
2391 }
2392
2393 static void
2394 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2395     GstRTSPStream * stream)
2396 {
2397   /* TODO: What to do here other than this? */
2398   GST_DEBUG ("Stream %p: Got EOS", stream);
2399   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2400 }
2401
2402 /* must be called with lock */
2403 static gboolean
2404 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2405 {
2406   GstRTSPStreamPrivate *priv;
2407   GstPad *pad, *sinkpad = NULL;
2408   gboolean is_tcp = FALSE, is_udp = FALSE;
2409   gint i;
2410
2411   priv = stream->priv;
2412
2413   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2414   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2415       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2416
2417   if (is_udp && !create_and_configure_udpsinks (stream))
2418     goto no_udp_protocol;
2419
2420   for (i = 0; i < 2; i++) {
2421     GstPad *teepad, *queuepad;
2422     /* For the sender we create this bit of pipeline for both
2423      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2424      * we need to add a queue before appsink and udpsink to make
2425      * the pipeline not block. For the TCP case, we want to pump
2426      * client as fast as possible anyway. This pipeline is used
2427      * when both TCP and UDP are present.
2428      *
2429      * .--------.      .-----.    .---------.    .---------.
2430      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2431      * |       send->sink   src->sink      src->sink       |
2432      * '--------'      |     |    '---------'    '---------'
2433      *                 |     |    .---------.    .---------.
2434      *                 |     |    |  queue  |    | appsink |
2435      *                 |    src->sink      src->sink       |
2436      *                 '-----'    '---------'    '---------'
2437      *
2438      * When only UDP or only TCP is allowed, we skip the tee and queue
2439      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2440      * the session.
2441      */
2442     /* Only link the RTP send src if we're going to send RTP, link
2443      * the RTCP send src always */
2444     if (priv->srcpad || i == 1) {
2445       if (is_udp) {
2446         /* add udpsink */
2447         gst_bin_add (bin, priv->udpsink[i]);
2448         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2449       }
2450
2451       if (is_tcp) {
2452         /* make appsink */
2453         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2454         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2455         gst_bin_add (bin, priv->appsink[i]);
2456         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2457             &sink_cb, stream, NULL);
2458       }
2459
2460       if (is_udp && is_tcp) {
2461         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2462
2463         /* make tee for RTP/RTCP */
2464         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2465         gst_bin_add (bin, priv->tee[i]);
2466
2467         /* and link to rtpbin send pad */
2468         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2469         gst_pad_link (priv->send_src[i], pad);
2470         gst_object_unref (pad);
2471
2472         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2473         g_object_set (priv->udpqueue[i], "max-size-buffers",
2474             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2475             NULL);
2476         gst_bin_add (bin, priv->udpqueue[i]);
2477         /* link tee to udpqueue */
2478         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2479         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2480         gst_pad_link (teepad, pad);
2481         gst_object_unref (pad);
2482         gst_object_unref (teepad);
2483
2484         /* link udpqueue to udpsink */
2485         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2486         gst_pad_link (queuepad, sinkpad);
2487         gst_object_unref (queuepad);
2488         gst_object_unref (sinkpad);
2489
2490         /* make appqueue */
2491         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2492         g_object_set (priv->appqueue[i], "max-size-buffers",
2493             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2494             NULL);
2495         gst_bin_add (bin, priv->appqueue[i]);
2496         /* and link tee to appqueue */
2497         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2498         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2499         gst_pad_link (teepad, pad);
2500         gst_object_unref (pad);
2501         gst_object_unref (teepad);
2502
2503         /* and link appqueue to appsink */
2504         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2505         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2506         gst_pad_link (queuepad, pad);
2507         gst_object_unref (pad);
2508         gst_object_unref (queuepad);
2509       } else if (is_tcp) {
2510         /* only appsink needed, link it to the session */
2511         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2512         gst_pad_link (priv->send_src[i], pad);
2513         gst_object_unref (pad);
2514
2515         /* when its only TCP, we need to set sync and preroll to FALSE
2516          * for the sink to avoid deadlock. And this is only needed for
2517          * sink used for RTCP data, not the RTP data. */
2518         if (i == 1)
2519           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2520       } else {
2521         /* else only udpsink needed, link it to the session */
2522         gst_pad_link (priv->send_src[i], sinkpad);
2523         gst_object_unref (sinkpad);
2524       }
2525     }
2526
2527     /* check if we need to set to a special state */
2528     if (state != GST_STATE_NULL) {
2529       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2530         gst_element_set_state (priv->udpsink[i], state);
2531       if (priv->appsink[i] && (priv->srcpad || i == 1))
2532         gst_element_set_state (priv->appsink[i], state);
2533       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2534         gst_element_set_state (priv->appqueue[i], state);
2535       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2536         gst_element_set_state (priv->udpqueue[i], state);
2537       if (priv->tee[i] && (priv->srcpad || i == 1))
2538         gst_element_set_state (priv->tee[i], state);
2539     }
2540   }
2541
2542   return TRUE;
2543
2544   /* ERRORS */
2545 no_udp_protocol:
2546   {
2547     return FALSE;
2548   }
2549 }
2550
2551 /* must be called with lock */
2552 static void
2553 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2554 {
2555   GstRTSPStreamPrivate *priv;
2556   GstPad *pad, *selpad;
2557   gboolean is_tcp;
2558   gint i;
2559
2560   priv = stream->priv;
2561
2562   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2563
2564   for (i = 0; i < 2; i++) {
2565     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2566      * RTCP sink always */
2567     if (priv->sinkpad || i == 1) {
2568       /* For the receiver we create this bit of pipeline for both
2569        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2570        * and it is all funneled into the rtpbin receive pad.
2571        *
2572        * .--------.     .--------.    .--------.
2573        * | udpsrc |     | funnel |    | rtpbin |
2574        * |       src->sink      src->sink      |
2575        * '--------'     |        |    '--------'
2576        * .--------.     |        |
2577        * | appsrc |     |        |
2578        * |       src->sink       |
2579        * '--------'     '--------'
2580        */
2581       /* make funnel for the RTP/RTCP receivers */
2582       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2583       gst_bin_add (bin, priv->funnel[i]);
2584
2585       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2586       gst_pad_link (pad, priv->recv_sink[i]);
2587       gst_object_unref (pad);
2588
2589       if (priv->udpsrc_v4[i]) {
2590         if (priv->srcpad) {
2591           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2592            * values. This is only relevant for PLAY pipelines */
2593           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2594           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2595         }
2596         /* add udpsrc */
2597         gst_bin_add (bin, priv->udpsrc_v4[i]);
2598
2599         /* and link to the funnel v4 */
2600         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2601         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2602         gst_pad_link (pad, selpad);
2603         gst_object_unref (pad);
2604         gst_object_unref (selpad);
2605       }
2606
2607       if (priv->udpsrc_v6[i]) {
2608         if (priv->srcpad) {
2609           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2610           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2611         }
2612         gst_bin_add (bin, priv->udpsrc_v6[i]);
2613
2614         /* and link to the funnel v6 */
2615         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2616         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2617         gst_pad_link (pad, selpad);
2618         gst_object_unref (pad);
2619         gst_object_unref (selpad);
2620       }
2621
2622       if (is_tcp) {
2623         /* make and add appsrc */
2624         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2625         priv->appsrc_base_time[i] = -1;
2626         if (priv->srcpad) {
2627           gst_element_set_state (priv->appsrc[i], GST_STATE_PLAYING);
2628           gst_element_set_locked_state (priv->appsrc[i], TRUE);
2629         }
2630         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2631             TRUE, NULL);
2632         gst_bin_add (bin, priv->appsrc[i]);
2633         /* and link to the funnel */
2634         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2635         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2636         gst_pad_link (pad, selpad);
2637         gst_object_unref (pad);
2638         gst_object_unref (selpad);
2639       }
2640     }
2641
2642     /* check if we need to set to a special state */
2643     if (state != GST_STATE_NULL) {
2644       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2645         gst_element_set_state (priv->funnel[i], state);
2646     }
2647   }
2648 }
2649
2650 /**
2651  * gst_rtsp_stream_join_bin:
2652  * @stream: a #GstRTSPStream
2653  * @bin: (transfer none): a #GstBin to join
2654  * @rtpbin: (transfer none): a rtpbin element in @bin
2655  * @state: the target state of the new elements
2656  *
2657  * Join the #GstBin @bin that contains the element @rtpbin.
2658  *
2659  * @stream will link to @rtpbin, which must be inside @bin. The elements
2660  * added to @bin will be set to the state given in @state.
2661  *
2662  * Returns: %TRUE on success.
2663  */
2664 gboolean
2665 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2666     GstElement * rtpbin, GstState state)
2667 {
2668   GstRTSPStreamPrivate *priv;
2669   guint idx;
2670   gchar *name;
2671   GstPadLinkReturn ret;
2672
2673   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2674   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2675   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2676
2677   priv = stream->priv;
2678
2679   g_mutex_lock (&priv->lock);
2680   if (priv->is_joined)
2681     goto was_joined;
2682
2683   /* create a session with the same index as the stream */
2684   idx = priv->idx;
2685
2686   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2687
2688   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2689       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2690     /* For SRTP */
2691     g_signal_connect (rtpbin, "request-rtp-encoder",
2692         (GCallback) request_rtp_encoder, stream);
2693     g_signal_connect (rtpbin, "request-rtcp-encoder",
2694         (GCallback) request_rtcp_encoder, stream);
2695     g_signal_connect (rtpbin, "request-rtp-decoder",
2696         (GCallback) request_rtp_rtcp_decoder, stream);
2697     g_signal_connect (rtpbin, "request-rtcp-decoder",
2698         (GCallback) request_rtp_rtcp_decoder, stream);
2699   }
2700
2701   if (priv->sinkpad) {
2702     g_signal_connect (rtpbin, "request-pt-map",
2703         (GCallback) request_pt_map, stream);
2704   }
2705
2706   /* get pads from the RTP session element for sending and receiving
2707    * RTP/RTCP*/
2708   if (priv->srcpad) {
2709     /* get a pad for sending RTP */
2710     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2711     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2712     g_free (name);
2713
2714     /* link the RTP pad to the session manager, it should not really fail unless
2715      * this is not really an RTP pad */
2716     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2717     if (ret != GST_PAD_LINK_OK)
2718       goto link_failed;
2719
2720     name = g_strdup_printf ("send_rtp_src_%u", idx);
2721     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2722     g_free (name);
2723   } else {
2724     /* Need to connect our sinkpad from here */
2725     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2726     /* EOS */
2727     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2728
2729     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2730     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2731     g_free (name);
2732   }
2733
2734   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2735   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2736   g_free (name);
2737   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2738   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2739   g_free (name);
2740
2741   /* get the session */
2742   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2743
2744   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2745       stream);
2746   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2747       stream);
2748   g_signal_connect (priv->session, "on-ssrc-active",
2749       (GCallback) on_ssrc_active, stream);
2750   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2751       stream);
2752   g_signal_connect (priv->session, "on-bye-timeout",
2753       (GCallback) on_bye_timeout, stream);
2754   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2755       stream);
2756
2757   /* signal for sender ssrc */
2758   g_signal_connect (priv->session, "on-new-sender-ssrc",
2759       (GCallback) on_new_sender_ssrc, stream);
2760   g_signal_connect (priv->session, "on-sender-ssrc-active",
2761       (GCallback) on_sender_ssrc_active, stream);
2762
2763   if (!create_sender_part (stream, bin, state))
2764     goto no_udp_protocol;
2765
2766   create_receiver_part (stream, bin, state);
2767
2768   if (priv->srcpad) {
2769     /* be notified of caps changes */
2770     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2771         (GCallback) caps_notify, stream);
2772   }
2773
2774   priv->joined_bin = bin;
2775   priv->is_joined = TRUE;
2776   g_mutex_unlock (&priv->lock);
2777
2778   return TRUE;
2779
2780   /* ERRORS */
2781 was_joined:
2782   {
2783     g_mutex_unlock (&priv->lock);
2784     return TRUE;
2785   }
2786 link_failed:
2787   {
2788     GST_WARNING ("failed to link stream %u", idx);
2789     gst_object_unref (priv->send_rtp_sink);
2790     priv->send_rtp_sink = NULL;
2791     g_mutex_unlock (&priv->lock);
2792     return FALSE;
2793   }
2794 no_udp_protocol:
2795   {
2796     GST_WARNING ("failed to allocate ports %u", idx);
2797     gst_object_unref (priv->send_rtp_sink);
2798     priv->send_rtp_sink = NULL;
2799     gst_object_unref (priv->send_src[0]);
2800     priv->send_src[0] = NULL;
2801     gst_object_unref (priv->send_src[1]);
2802     priv->send_src[1] = NULL;
2803     gst_object_unref (priv->recv_sink[0]);
2804     priv->recv_sink[0] = NULL;
2805     gst_object_unref (priv->recv_sink[1]);
2806     priv->recv_sink[1] = NULL;
2807     if (priv->udpsink[0])
2808       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2809     if (priv->udpsink[1])
2810       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2811     if (priv->udpsrc_v4[0]) {
2812       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2813       gst_object_unref (priv->udpsrc_v4[0]);
2814       priv->udpsrc_v4[0] = NULL;
2815     }
2816     if (priv->udpsrc_v4[1]) {
2817       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2818       gst_object_unref (priv->udpsrc_v4[1]);
2819       priv->udpsrc_v4[1] = NULL;
2820     }
2821     if (priv->udpsrc_mcast_v4[0]) {
2822       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2823       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2824       priv->udpsrc_mcast_v4[0] = NULL;
2825     }
2826     if (priv->udpsrc_mcast_v4[1]) {
2827       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2828       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2829       priv->udpsrc_mcast_v4[1] = NULL;
2830     }
2831     if (priv->udpsrc_v6[0]) {
2832       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2833       gst_object_unref (priv->udpsrc_v6[0]);
2834       priv->udpsrc_v6[0] = NULL;
2835     }
2836     if (priv->udpsrc_v6[1]) {
2837       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2838       gst_object_unref (priv->udpsrc_v6[1]);
2839       priv->udpsrc_v6[1] = NULL;
2840     }
2841     if (priv->udpsrc_mcast_v6[0]) {
2842       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2843       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2844       priv->udpsrc_mcast_v6[0] = NULL;
2845     }
2846     if (priv->udpsrc_mcast_v6[1]) {
2847       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2848       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2849       priv->udpsrc_mcast_v6[1] = NULL;
2850     }
2851     g_mutex_unlock (&priv->lock);
2852     return FALSE;
2853   }
2854 }
2855
2856 /**
2857  * gst_rtsp_stream_leave_bin:
2858  * @stream: a #GstRTSPStream
2859  * @bin: (transfer none): a #GstBin
2860  * @rtpbin: (transfer none): a rtpbin #GstElement
2861  *
2862  * Remove the elements of @stream from @bin.
2863  *
2864  * Return: %TRUE on success.
2865  */
2866 gboolean
2867 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2868     GstElement * rtpbin)
2869 {
2870   GstRTSPStreamPrivate *priv;
2871   gint i;
2872   gboolean is_tcp, is_udp;
2873
2874   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2875   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2876   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2877
2878   priv = stream->priv;
2879
2880   g_mutex_lock (&priv->lock);
2881   if (!priv->is_joined)
2882     goto was_not_joined;
2883
2884   priv->joined_bin = NULL;
2885
2886   /* all transports must be removed by now */
2887   if (priv->transports != NULL)
2888     goto transports_not_removed;
2889
2890   clear_tr_cache (priv, TRUE);
2891   clear_tr_cache (priv, FALSE);
2892
2893   GST_INFO ("stream %p leaving bin", stream);
2894
2895   if (priv->srcpad) {
2896     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2897
2898     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2899     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2900     gst_object_unref (priv->send_rtp_sink);
2901     priv->send_rtp_sink = NULL;
2902   } else if (priv->recv_rtp_src) {
2903     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2904     gst_object_unref (priv->recv_rtp_src);
2905     priv->recv_rtp_src = NULL;
2906   }
2907
2908   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2909
2910   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2911       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2912
2913
2914   for (i = 0; i < 2; i++) {
2915     if (priv->udpsink[i])
2916       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2917     if (priv->appsink[i])
2918       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2919     if (priv->appqueue[i])
2920       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2921     if (priv->udpqueue[i])
2922       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2923     if (priv->tee[i])
2924       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2925     if (priv->funnel[i])
2926       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2927     if (priv->appsrc[i])
2928       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2929
2930     if (priv->udpsrc_v4[i]) {
2931       if (priv->sinkpad || i == 1) {
2932         /* and set udpsrc to NULL now before removing */
2933         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2934         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2935         /* removing them should also nicely release the request
2936          * pads when they finalize */
2937         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2938       } else {
2939         /* we need to set the state to NULL before unref */
2940         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2941         gst_object_unref (priv->udpsrc_v4[i]);
2942       }
2943     }
2944
2945     if (priv->udpsrc_mcast_v4[i]) {
2946       if (priv->sinkpad || i == 1) {
2947         /* and set udpsrc to NULL now before removing */
2948         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2949         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2950         /* removing them should also nicely release the request
2951          * pads when they finalize */
2952         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2953       } else {
2954         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2955         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2956       }
2957     }
2958
2959     if (priv->udpsrc_v6[i]) {
2960       if (priv->sinkpad || i == 1) {
2961         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2962         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2963         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2964       } else {
2965         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2966         gst_object_unref (priv->udpsrc_v6[i]);
2967       }
2968     }
2969     if (priv->udpsrc_mcast_v6[i]) {
2970       if (priv->sinkpad || i == 1) {
2971         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
2972         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2973         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
2974       } else {
2975         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2976         gst_object_unref (priv->udpsrc_mcast_v6[i]);
2977       }
2978     }
2979
2980     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2981       gst_bin_remove (bin, priv->udpsink[i]);
2982     if (priv->appsrc[i]) {
2983       if (priv->sinkpad || i == 1) {
2984         gst_element_set_locked_state (priv->appsrc[i], FALSE);
2985         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2986         gst_bin_remove (bin, priv->appsrc[i]);
2987       } else {
2988         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2989         gst_object_unref (priv->appsrc[i]);
2990       }
2991     }
2992     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2993       gst_bin_remove (bin, priv->appsink[i]);
2994     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2995       gst_bin_remove (bin, priv->appqueue[i]);
2996     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2997       gst_bin_remove (bin, priv->udpqueue[i]);
2998     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2999       gst_bin_remove (bin, priv->tee[i]);
3000     if (priv->funnel[i] && (priv->sinkpad || i == 1))
3001       gst_bin_remove (bin, priv->funnel[i]);
3002
3003     if (priv->sinkpad || i == 1) {
3004       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3005       gst_object_unref (priv->recv_sink[i]);
3006       priv->recv_sink[i] = NULL;
3007     }
3008
3009     priv->udpsrc_v4[i] = NULL;
3010     priv->udpsrc_v6[i] = NULL;
3011     priv->udpsrc_mcast_v4[i] = NULL;
3012     priv->udpsrc_mcast_v6[i] = NULL;
3013     priv->udpsink[i] = NULL;
3014     priv->appsrc[i] = NULL;
3015     priv->appsink[i] = NULL;
3016     priv->appqueue[i] = NULL;
3017     priv->udpqueue[i] = NULL;
3018     priv->tee[i] = NULL;
3019     priv->funnel[i] = NULL;
3020   }
3021
3022   if (priv->srcpad) {
3023     gst_object_unref (priv->send_src[0]);
3024     priv->send_src[0] = NULL;
3025   }
3026
3027   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3028   gst_object_unref (priv->send_src[1]);
3029   priv->send_src[1] = NULL;
3030
3031   g_object_unref (priv->session);
3032   priv->session = NULL;
3033   if (priv->caps)
3034     gst_caps_unref (priv->caps);
3035   priv->caps = NULL;
3036
3037   if (priv->srtpenc)
3038     gst_object_unref (priv->srtpenc);
3039   if (priv->srtpdec)
3040     gst_object_unref (priv->srtpdec);
3041
3042   priv->is_joined = FALSE;
3043   g_mutex_unlock (&priv->lock);
3044
3045   return TRUE;
3046
3047 was_not_joined:
3048   {
3049     g_mutex_unlock (&priv->lock);
3050     return TRUE;
3051   }
3052 transports_not_removed:
3053   {
3054     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3055     g_mutex_unlock (&priv->lock);
3056     return FALSE;
3057   }
3058 }
3059
3060 /**
3061  * gst_rtsp_stream_get_joined_bin:
3062  * @stream: a #GstRTSPStream
3063  *
3064  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3065  *
3066  * Return: (transfer full): the joined bin or NULL.
3067  */
3068 GstBin *
3069 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3070 {
3071   GstRTSPStreamPrivate *priv;
3072   GstBin *bin = NULL;
3073
3074   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3075
3076   priv = stream->priv;
3077
3078   g_mutex_lock (&priv->lock);
3079   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3080   g_mutex_unlock (&priv->lock);
3081
3082   return bin;
3083 }
3084
3085 /**
3086  * gst_rtsp_stream_get_rtpinfo:
3087  * @stream: a #GstRTSPStream
3088  * @rtptime: (allow-none): result RTP timestamp
3089  * @seq: (allow-none): result RTP seqnum
3090  * @clock_rate: (allow-none): the clock rate
3091  * @running_time: (allow-none): result running-time
3092  *
3093  * Retrieve the current rtptime, seq and running-time. This is used to
3094  * construct a RTPInfo reply header.
3095  *
3096  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3097  */
3098 gboolean
3099 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3100     guint * rtptime, guint * seq, guint * clock_rate,
3101     GstClockTime * running_time)
3102 {
3103   GstRTSPStreamPrivate *priv;
3104   GstStructure *stats;
3105   GObjectClass *payobjclass;
3106
3107   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3108
3109   priv = stream->priv;
3110
3111   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3112
3113   g_mutex_lock (&priv->lock);
3114
3115   /* First try to extract the information from the last buffer on the sinks.
3116    * This will have a more accurate sequence number and timestamp, as between
3117    * the payloader and the sink there can be some queues
3118    */
3119   if (priv->udpsink[0] || priv->appsink[0]) {
3120     GstSample *last_sample;
3121
3122     if (priv->udpsink[0])
3123       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3124     else
3125       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3126
3127     if (last_sample) {
3128       GstCaps *caps;
3129       GstBuffer *buffer;
3130       GstSegment *segment;
3131       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3132
3133       caps = gst_sample_get_caps (last_sample);
3134       buffer = gst_sample_get_buffer (last_sample);
3135       segment = gst_sample_get_segment (last_sample);
3136
3137       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3138         if (seq) {
3139           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3140         }
3141
3142         if (rtptime) {
3143           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3144         }
3145
3146         gst_rtp_buffer_unmap (&rtp_buffer);
3147
3148         if (running_time) {
3149           *running_time =
3150               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3151               GST_BUFFER_TIMESTAMP (buffer));
3152         }
3153
3154         if (clock_rate) {
3155           GstStructure *s = gst_caps_get_structure (caps, 0);
3156
3157           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3158
3159           if (*clock_rate == 0 && running_time)
3160             *running_time = GST_CLOCK_TIME_NONE;
3161         }
3162         gst_sample_unref (last_sample);
3163
3164         goto done;
3165       } else {
3166         gst_sample_unref (last_sample);
3167       }
3168     }
3169   }
3170
3171   if (g_object_class_find_property (payobjclass, "stats")) {
3172     g_object_get (priv->payloader, "stats", &stats, NULL);
3173     if (stats == NULL)
3174       goto no_stats;
3175
3176     if (seq)
3177       gst_structure_get_uint (stats, "seqnum", seq);
3178
3179     if (rtptime)
3180       gst_structure_get_uint (stats, "timestamp", rtptime);
3181
3182     if (running_time)
3183       gst_structure_get_clock_time (stats, "running-time", running_time);
3184
3185     if (clock_rate) {
3186       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3187       if (*clock_rate == 0 && running_time)
3188         *running_time = GST_CLOCK_TIME_NONE;
3189     }
3190     gst_structure_free (stats);
3191   } else {
3192     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3193         !g_object_class_find_property (payobjclass, "timestamp"))
3194       goto no_stats;
3195
3196     if (seq)
3197       g_object_get (priv->payloader, "seqnum", seq, NULL);
3198
3199     if (rtptime)
3200       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3201
3202     if (running_time)
3203       *running_time = GST_CLOCK_TIME_NONE;
3204   }
3205
3206 done:
3207   g_mutex_unlock (&priv->lock);
3208
3209   return TRUE;
3210
3211   /* ERRORS */
3212 no_stats:
3213   {
3214     GST_WARNING ("Could not get payloader stats");
3215     g_mutex_unlock (&priv->lock);
3216     return FALSE;
3217   }
3218 }
3219
3220 /**
3221  * gst_rtsp_stream_get_caps:
3222  * @stream: a #GstRTSPStream
3223  *
3224  * Retrieve the current caps of @stream.
3225  *
3226  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3227  * after usage.
3228  */
3229 GstCaps *
3230 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3231 {
3232   GstRTSPStreamPrivate *priv;
3233   GstCaps *result;
3234
3235   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3236
3237   priv = stream->priv;
3238
3239   g_mutex_lock (&priv->lock);
3240   if ((result = priv->caps))
3241     gst_caps_ref (result);
3242   g_mutex_unlock (&priv->lock);
3243
3244   return result;
3245 }
3246
3247 /**
3248  * gst_rtsp_stream_recv_rtp:
3249  * @stream: a #GstRTSPStream
3250  * @buffer: (transfer full): a #GstBuffer
3251  *
3252  * Handle an RTP buffer for the stream. This method is usually called when a
3253  * message has been received from a client using the TCP transport.
3254  *
3255  * This function takes ownership of @buffer.
3256  *
3257  * Returns: a GstFlowReturn.
3258  */
3259 GstFlowReturn
3260 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3261 {
3262   GstRTSPStreamPrivate *priv;
3263   GstFlowReturn ret;
3264   GstElement *element;
3265
3266   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3267   priv = stream->priv;
3268   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3269   g_return_val_if_fail (priv->is_joined, FALSE);
3270
3271   g_mutex_lock (&priv->lock);
3272   if (priv->appsrc[0])
3273     element = gst_object_ref (priv->appsrc[0]);
3274   else
3275     element = NULL;
3276   g_mutex_unlock (&priv->lock);
3277
3278   if (element) {
3279     if (priv->appsrc_base_time[0] == -1) {
3280       /* Take current running_time. This timestamp will be put on
3281        * the first buffer of each stream because we are a live source and so we
3282        * timestamp with the running_time. When we are dealing with TCP, we also
3283        * only timestamp the first buffer (using the DISCONT flag) because a server
3284        * typically bursts data, for which we don't want to compensate by speeding
3285        * up the media. The other timestamps will be interpollated from this one
3286        * using the RTP timestamps. */
3287       GST_OBJECT_LOCK (element);
3288       if (GST_ELEMENT_CLOCK (element)) {
3289         GstClockTime now;
3290         GstClockTime base_time;
3291
3292         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3293         base_time = GST_ELEMENT_CAST (element)->base_time;
3294
3295         priv->appsrc_base_time[0] = now - base_time;
3296         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3297         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3298             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3299             GST_TIME_ARGS (base_time));
3300       }
3301       GST_OBJECT_UNLOCK (element);
3302     }
3303
3304     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3305     gst_object_unref (element);
3306   } else {
3307     ret = GST_FLOW_OK;
3308   }
3309   return ret;
3310 }
3311
3312 /**
3313  * gst_rtsp_stream_recv_rtcp:
3314  * @stream: a #GstRTSPStream
3315  * @buffer: (transfer full): a #GstBuffer
3316  *
3317  * Handle an RTCP buffer for the stream. This method is usually called when a
3318  * message has been received from a client using the TCP transport.
3319  *
3320  * This function takes ownership of @buffer.
3321  *
3322  * Returns: a GstFlowReturn.
3323  */
3324 GstFlowReturn
3325 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3326 {
3327   GstRTSPStreamPrivate *priv;
3328   GstFlowReturn ret;
3329   GstElement *element;
3330
3331   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3332   priv = stream->priv;
3333   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3334
3335   if (!priv->is_joined) {
3336     gst_buffer_unref (buffer);
3337     return GST_FLOW_NOT_LINKED;
3338   }
3339   g_mutex_lock (&priv->lock);
3340   if (priv->appsrc[1])
3341     element = gst_object_ref (priv->appsrc[1]);
3342   else
3343     element = NULL;
3344   g_mutex_unlock (&priv->lock);
3345
3346   if (element) {
3347     if (priv->appsrc_base_time[1] == -1) {
3348       /* Take current running_time. This timestamp will be put on
3349        * the first buffer of each stream because we are a live source and so we
3350        * timestamp with the running_time. When we are dealing with TCP, we also
3351        * only timestamp the first buffer (using the DISCONT flag) because a server
3352        * typically bursts data, for which we don't want to compensate by speeding
3353        * up the media. The other timestamps will be interpollated from this one
3354        * using the RTP timestamps. */
3355       GST_OBJECT_LOCK (element);
3356       if (GST_ELEMENT_CLOCK (element)) {
3357         GstClockTime now;
3358         GstClockTime base_time;
3359
3360         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3361         base_time = GST_ELEMENT_CAST (element)->base_time;
3362
3363         priv->appsrc_base_time[1] = now - base_time;
3364         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3365         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3366             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3367             GST_TIME_ARGS (base_time));
3368       }
3369       GST_OBJECT_UNLOCK (element);
3370     }
3371
3372     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3373     gst_object_unref (element);
3374   } else {
3375     ret = GST_FLOW_OK;
3376     gst_buffer_unref (buffer);
3377   }
3378   return ret;
3379 }
3380
3381 /* must be called with lock */
3382 static gboolean
3383 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3384     gboolean add)
3385 {
3386   GstRTSPStreamPrivate *priv = stream->priv;
3387   const GstRTSPTransport *tr;
3388
3389   tr = gst_rtsp_stream_transport_get_transport (trans);
3390
3391   switch (tr->lower_transport) {
3392     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3393     case GST_RTSP_LOWER_TRANS_UDP:
3394     {
3395       gchar *dest;
3396       gint min, max;
3397       guint ttl = 0;
3398
3399       dest = tr->destination;
3400       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3401         min = tr->port.min;
3402         max = tr->port.max;
3403         ttl = tr->ttl;
3404       } else if (priv->client_side) {
3405         /* In client side mode the 'destination' is the RTSP server, so send
3406          * to those ports */
3407         min = tr->server_port.min;
3408         max = tr->server_port.max;
3409       } else {
3410         min = tr->client_port.min;
3411         max = tr->client_port.max;
3412       }
3413
3414       if (add) {
3415         if (ttl > 0) {
3416           GST_INFO ("setting ttl-mc %d", ttl);
3417           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3418           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3419         }
3420         GST_INFO ("adding %s:%d-%d", dest, min, max);
3421         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3422         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3423         priv->transports = g_list_prepend (priv->transports, trans);
3424       } else {
3425         GST_INFO ("removing %s:%d-%d", dest, min, max);
3426         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3427         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3428         priv->transports = g_list_remove (priv->transports, trans);
3429       }
3430       priv->transports_cookie++;
3431       break;
3432     }
3433     case GST_RTSP_LOWER_TRANS_TCP:
3434       if (add) {
3435         GST_INFO ("adding TCP %s", tr->destination);
3436         priv->transports = g_list_prepend (priv->transports, trans);
3437       } else {
3438         GST_INFO ("removing TCP %s", tr->destination);
3439         priv->transports = g_list_remove (priv->transports, trans);
3440       }
3441       priv->transports_cookie++;
3442       break;
3443     default:
3444       goto unknown_transport;
3445   }
3446   return TRUE;
3447
3448   /* ERRORS */
3449 unknown_transport:
3450   {
3451     GST_INFO ("Unknown transport %d", tr->lower_transport);
3452     return FALSE;
3453   }
3454 }
3455
3456
3457 /**
3458  * gst_rtsp_stream_add_transport:
3459  * @stream: a #GstRTSPStream
3460  * @trans: (transfer none): a #GstRTSPStreamTransport
3461  *
3462  * Add the transport in @trans to @stream. The media of @stream will
3463  * then also be send to the values configured in @trans.
3464  *
3465  * @stream must be joined to a bin.
3466  *
3467  * @trans must contain a valid #GstRTSPTransport.
3468  *
3469  * Returns: %TRUE if @trans was added
3470  */
3471 gboolean
3472 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3473     GstRTSPStreamTransport * trans)
3474 {
3475   GstRTSPStreamPrivate *priv;
3476   gboolean res;
3477
3478   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3479   priv = stream->priv;
3480   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3481   g_return_val_if_fail (priv->is_joined, FALSE);
3482
3483   g_mutex_lock (&priv->lock);
3484   res = update_transport (stream, trans, TRUE);
3485   g_mutex_unlock (&priv->lock);
3486
3487   return res;
3488 }
3489
3490 /**
3491  * gst_rtsp_stream_remove_transport:
3492  * @stream: a #GstRTSPStream
3493  * @trans: (transfer none): a #GstRTSPStreamTransport
3494  *
3495  * Remove the transport in @trans from @stream. The media of @stream will
3496  * not be sent to the values configured in @trans.
3497  *
3498  * @stream must be joined to a bin.
3499  *
3500  * @trans must contain a valid #GstRTSPTransport.
3501  *
3502  * Returns: %TRUE if @trans was removed
3503  */
3504 gboolean
3505 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3506     GstRTSPStreamTransport * trans)
3507 {
3508   GstRTSPStreamPrivate *priv;
3509   gboolean res;
3510
3511   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3512   priv = stream->priv;
3513   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3514   g_return_val_if_fail (priv->is_joined, FALSE);
3515
3516   g_mutex_lock (&priv->lock);
3517   res = update_transport (stream, trans, FALSE);
3518   g_mutex_unlock (&priv->lock);
3519
3520   return res;
3521 }
3522
3523 /**
3524  * gst_rtsp_stream_update_crypto:
3525  * @stream: a #GstRTSPStream
3526  * @ssrc: the SSRC
3527  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3528  *
3529  * Update the new crypto information for @ssrc in @stream. If information
3530  * for @ssrc did not exist, it will be added. If information
3531  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3532  * be removed from @stream.
3533  *
3534  * Returns: %TRUE if @crypto could be updated
3535  */
3536 gboolean
3537 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3538     guint ssrc, GstCaps * crypto)
3539 {
3540   GstRTSPStreamPrivate *priv;
3541
3542   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3543   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3544
3545   priv = stream->priv;
3546
3547   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3548
3549   g_mutex_lock (&priv->lock);
3550   if (crypto)
3551     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3552         gst_caps_ref (crypto));
3553   else
3554     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3555   g_mutex_unlock (&priv->lock);
3556
3557   return TRUE;
3558 }
3559
3560 /**
3561  * gst_rtsp_stream_get_rtp_socket:
3562  * @stream: a #GstRTSPStream
3563  * @family: the socket family
3564  *
3565  * Get the RTP socket from @stream for a @family.
3566  *
3567  * @stream must be joined to a bin.
3568  *
3569  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3570  * socket could be allocated for @family. Unref after usage
3571  */
3572 GSocket *
3573 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3574 {
3575   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3576   GSocket *socket;
3577   const gchar *name;
3578
3579   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3580   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3581       family == G_SOCKET_FAMILY_IPV6, NULL);
3582   g_return_val_if_fail (priv->udpsink[0], NULL);
3583
3584   if (family == G_SOCKET_FAMILY_IPV6)
3585     name = "socket-v6";
3586   else
3587     name = "socket";
3588
3589   g_object_get (priv->udpsink[0], name, &socket, NULL);
3590
3591   return socket;
3592 }
3593
3594 /**
3595  * gst_rtsp_stream_get_rtcp_socket:
3596  * @stream: a #GstRTSPStream
3597  * @family: the socket family
3598  *
3599  * Get the RTCP socket from @stream for a @family.
3600  *
3601  * @stream must be joined to a bin.
3602  *
3603  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3604  * socket could be allocated for @family. Unref after usage
3605  */
3606 GSocket *
3607 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3608 {
3609   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3610   GSocket *socket;
3611   const gchar *name;
3612
3613   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3614   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3615       family == G_SOCKET_FAMILY_IPV6, NULL);
3616   g_return_val_if_fail (priv->udpsink[1], NULL);
3617
3618   if (family == G_SOCKET_FAMILY_IPV6)
3619     name = "socket-v6";
3620   else
3621     name = "socket";
3622
3623   g_object_get (priv->udpsink[1], name, &socket, NULL);
3624
3625   return socket;
3626 }
3627
3628 /**
3629  * gst_rtsp_stream_set_seqnum:
3630  * @stream: a #GstRTSPStream
3631  * @seqnum: a new sequence number
3632  *
3633  * Configure the sequence number in the payloader of @stream to @seqnum.
3634  */
3635 void
3636 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3637 {
3638   GstRTSPStreamPrivate *priv;
3639
3640   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3641
3642   priv = stream->priv;
3643
3644   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3645 }
3646
3647 /**
3648  * gst_rtsp_stream_get_seqnum:
3649  * @stream: a #GstRTSPStream
3650  *
3651  * Get the configured sequence number in the payloader of @stream.
3652  *
3653  * Returns: the sequence number of the payloader.
3654  */
3655 guint16
3656 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3657 {
3658   GstRTSPStreamPrivate *priv;
3659   guint seqnum;
3660
3661   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3662
3663   priv = stream->priv;
3664
3665   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3666
3667   return seqnum;
3668 }
3669
3670 /**
3671  * gst_rtsp_stream_transport_filter:
3672  * @stream: a #GstRTSPStream
3673  * @func: (scope call) (allow-none): a callback
3674  * @user_data: (closure): user data passed to @func
3675  *
3676  * Call @func for each transport managed by @stream. The result value of @func
3677  * determines what happens to the transport. @func will be called with @stream
3678  * locked so no further actions on @stream can be performed from @func.
3679  *
3680  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3681  * @stream.
3682  *
3683  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3684  *
3685  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3686  * will also be added with an additional ref to the result #GList of this
3687  * function..
3688  *
3689  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3690  *
3691  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3692  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3693  * element in the #GList should be unreffed before the list is freed.
3694  */
3695 GList *
3696 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3697     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3698 {
3699   GstRTSPStreamPrivate *priv;
3700   GList *result, *walk, *next;
3701   GHashTable *visited = NULL;
3702   guint cookie;
3703
3704   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3705
3706   priv = stream->priv;
3707
3708   result = NULL;
3709   if (func)
3710     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3711
3712   g_mutex_lock (&priv->lock);
3713 restart:
3714   cookie = priv->transports_cookie;
3715   for (walk = priv->transports; walk; walk = next) {
3716     GstRTSPStreamTransport *trans = walk->data;
3717     GstRTSPFilterResult res;
3718     gboolean changed;
3719
3720     next = g_list_next (walk);
3721
3722     if (func) {
3723       /* only visit each transport once */
3724       if (g_hash_table_contains (visited, trans))
3725         continue;
3726
3727       g_hash_table_add (visited, g_object_ref (trans));
3728       g_mutex_unlock (&priv->lock);
3729
3730       res = func (stream, trans, user_data);
3731
3732       g_mutex_lock (&priv->lock);
3733     } else
3734       res = GST_RTSP_FILTER_REF;
3735
3736     changed = (cookie != priv->transports_cookie);
3737
3738     switch (res) {
3739       case GST_RTSP_FILTER_REMOVE:
3740         update_transport (stream, trans, FALSE);
3741         break;
3742       case GST_RTSP_FILTER_REF:
3743         result = g_list_prepend (result, g_object_ref (trans));
3744         break;
3745       case GST_RTSP_FILTER_KEEP:
3746       default:
3747         break;
3748     }
3749     if (changed)
3750       goto restart;
3751   }
3752   g_mutex_unlock (&priv->lock);
3753
3754   if (func)
3755     g_hash_table_unref (visited);
3756
3757   return result;
3758 }
3759
3760 static GstPadProbeReturn
3761 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3762 {
3763   GstRTSPStreamPrivate *priv;
3764   GstRTSPStream *stream;
3765
3766   stream = user_data;
3767   priv = stream->priv;
3768
3769   GST_DEBUG_OBJECT (pad, "now blocking");
3770
3771   g_mutex_lock (&priv->lock);
3772   priv->blocking = TRUE;
3773   g_mutex_unlock (&priv->lock);
3774
3775   gst_element_post_message (priv->payloader,
3776       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3777           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3778
3779   return GST_PAD_PROBE_OK;
3780 }
3781
3782 /**
3783  * gst_rtsp_stream_set_blocked:
3784  * @stream: a #GstRTSPStream
3785  * @blocked: boolean indicating we should block or unblock
3786  *
3787  * Blocks or unblocks the dataflow on @stream.
3788  *
3789  * Returns: %TRUE on success
3790  */
3791 gboolean
3792 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3793 {
3794   GstRTSPStreamPrivate *priv;
3795
3796   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3797
3798   priv = stream->priv;
3799
3800   g_mutex_lock (&priv->lock);
3801   if (blocked) {
3802     priv->blocking = FALSE;
3803     if (priv->blocked_id == 0) {
3804       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3805           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3806           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3807           g_object_ref (stream), g_object_unref);
3808     }
3809   } else {
3810     if (priv->blocked_id != 0) {
3811       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3812       priv->blocked_id = 0;
3813       priv->blocking = FALSE;
3814     }
3815   }
3816   g_mutex_unlock (&priv->lock);
3817
3818   return TRUE;
3819 }
3820
3821 /**
3822  * gst_rtsp_stream_is_blocking:
3823  * @stream: a #GstRTSPStream
3824  *
3825  * Check if @stream is blocking on a #GstBuffer.
3826  *
3827  * Returns: %TRUE if @stream is blocking
3828  */
3829 gboolean
3830 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3831 {
3832   GstRTSPStreamPrivate *priv;
3833   gboolean result;
3834
3835   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3836
3837   priv = stream->priv;
3838
3839   g_mutex_lock (&priv->lock);
3840   result = priv->blocking;
3841   g_mutex_unlock (&priv->lock);
3842
3843   return result;
3844 }
3845
3846 /**
3847  * gst_rtsp_stream_query_position:
3848  * @stream: a #GstRTSPStream
3849  *
3850  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3851  * the RTP parts of the pipeline and not the RTCP parts.
3852  *
3853  * Returns: %TRUE if the position could be queried
3854  */
3855 gboolean
3856 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3857 {
3858   GstRTSPStreamPrivate *priv;
3859   GstElement *sink;
3860   gboolean ret;
3861
3862   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3863
3864   priv = stream->priv;
3865
3866   g_mutex_lock (&priv->lock);
3867   /* depending on the transport type, it should query corresponding sink */
3868   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3869       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3870     sink = priv->udpsink[0];
3871   else
3872     sink = priv->appsink[0];
3873
3874   if (sink)
3875     gst_object_ref (sink);
3876   g_mutex_unlock (&priv->lock);
3877
3878   if (!sink)
3879     return FALSE;
3880
3881   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3882   gst_object_unref (sink);
3883
3884   return ret;
3885 }
3886
3887 /**
3888  * gst_rtsp_stream_query_stop:
3889  * @stream: a #GstRTSPStream
3890  *
3891  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3892  * the RTP parts of the pipeline and not the RTCP parts.
3893  *
3894  * Returns: %TRUE if the stop could be queried
3895  */
3896 gboolean
3897 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3898 {
3899   GstRTSPStreamPrivate *priv;
3900   GstElement *sink;
3901   GstQuery *query;
3902   gboolean ret;
3903
3904   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3905
3906   priv = stream->priv;
3907
3908   g_mutex_lock (&priv->lock);
3909   /* depending on the transport type, it should query corresponding sink */
3910   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3911       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3912     sink = priv->udpsink[0];
3913   else
3914     sink = priv->appsink[0];
3915
3916   if (sink)
3917     gst_object_ref (sink);
3918   g_mutex_unlock (&priv->lock);
3919
3920   if (!sink)
3921     return FALSE;
3922
3923   query = gst_query_new_segment (GST_FORMAT_TIME);
3924   if ((ret = gst_element_query (sink, query))) {
3925     GstFormat format;
3926
3927     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3928     if (format != GST_FORMAT_TIME)
3929       *stop = -1;
3930   }
3931   gst_query_unref (query);
3932   gst_object_unref (sink);
3933
3934   return ret;
3935
3936 }