b95ead95a1da16e0e058e24cf93781ad4299b462
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 struct _GstRTSPStreamPrivate
63 {
64   GMutex lock;
65   guint idx;
66   /* Only one pad is ever set */
67   GstPad *srcpad, *sinkpad;
68   GstElement *payloader;
69   guint buffer_size;
70   GstBin *joined_bin;
71
72   /* TRUE if this stream is running on
73    * the client side of an RTSP link (for RECORD) */
74   gboolean client_side;
75   gchar *control;
76
77   /* TRUE if stream is complete. This means that the receiver and the sender
78    * parts are present in the stream. */
79   gboolean is_complete;
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans allowed_protocols;
82   GstRTSPLowerTrans configured_protocols;
83
84   /* pads on the rtpbin */
85   GstPad *send_rtp_sink;
86   GstPad *recv_rtp_src;
87   GstPad *recv_sink[2];
88   GstPad *send_src[2];
89
90   /* the RTPSession object */
91   GObject *session;
92
93   /* SRTP encoder/decoder */
94   GstElement *srtpenc;
95   GstElement *srtpdec;
96   GHashTable *keys;
97
98   /* for UDP unicast */
99   GstElement *udpsrc_v4[2];
100   GstElement *udpsrc_v6[2];
101   GstElement *udpqueue[2];
102   GstElement *udpsink[2];
103   GSocket *socket_v4[2];
104   GSocket *socket_v6[2];
105
106   /* for UDP multicast */
107   GstElement *mcast_udpsrc_v4[2];
108   GstElement *mcast_udpsrc_v6[2];
109   GstElement *mcast_udpqueue[2];
110   GstElement *mcast_udpsink[2];
111   GSocket *mcast_socket_v4[2];
112   GSocket *mcast_socket_v6[2];
113
114   /* for TCP transport */
115   GstElement *appsrc[2];
116   GstClockTime appsrc_base_time[2];
117   GstElement *appqueue[2];
118   GstElement *appsink[2];
119
120   GstElement *tee[2];
121   GstElement *funnel[2];
122
123   /* retransmission */
124   GstElement *rtxsend;
125   GstElement *rtxreceive;
126   guint rtx_pt;
127   GstClockTime rtx_time;
128
129   /* Forward Error Correction with RFC 5109 */
130   GstElement *ulpfec_decoder;
131   GstElement *ulpfec_encoder;
132   guint ulpfec_pt;
133   gboolean ulpfec_enabled;
134   guint ulpfec_percentage;
135
136   /* pool used to manage unicast and multicast addresses */
137   GstRTSPAddressPool *pool;
138
139   /* unicast server addr/port */
140   GstRTSPAddress *server_addr_v4;
141   GstRTSPAddress *server_addr_v6;
142
143   /* multicast addresses */
144   GstRTSPAddress *mcast_addr_v4;
145   GstRTSPAddress *mcast_addr_v6;
146
147   gchar *multicast_iface;
148
149   /* the caps of the stream */
150   gulong caps_sig;
151   GstCaps *caps;
152
153   /* transports we stream to */
154   guint n_active;
155   GList *transports;
156   guint transports_cookie;
157   GList *tr_cache_rtp;
158   GList *tr_cache_rtcp;
159   guint tr_cache_cookie_rtp;
160   guint tr_cache_cookie_rtcp;
161   guint n_tcp_transports;
162   gboolean have_buffer[2];
163   guint n_outstanding;
164
165   gint dscp_qos;
166
167   /* stream blocking */
168   gulong blocked_id[2];
169   gboolean blocking;
170
171   /* current stream postion */
172   GstClockTime position;
173
174   /* pt->caps map for RECORD streams */
175   GHashTable *ptmap;
176
177   GstRTSPPublishClockMode publish_clock_mode;
178 };
179
180 #define DEFAULT_CONTROL         NULL
181 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
182 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
183                                         GST_RTSP_LOWER_TRANS_TCP
184
185 enum
186 {
187   PROP_0,
188   PROP_CONTROL,
189   PROP_PROFILES,
190   PROP_PROTOCOLS,
191   PROP_LAST
192 };
193
194 enum
195 {
196   SIGNAL_NEW_RTP_ENCODER,
197   SIGNAL_NEW_RTCP_ENCODER,
198   SIGNAL_NEW_RTP_RTCP_DECODER,
199   SIGNAL_LAST
200 };
201
202 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
203 #define GST_CAT_DEFAULT rtsp_stream_debug
204
205 static GQuark ssrc_stream_map_key;
206
207 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
208     GValue * value, GParamSpec * pspec);
209 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
210     const GValue * value, GParamSpec * pspec);
211
212 static void gst_rtsp_stream_finalize (GObject * obj);
213
214 static gboolean
215 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
216     gboolean add);
217
218 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
219
220 G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
221
222 static void
223 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
224 {
225   GObjectClass *gobject_class;
226
227   gobject_class = G_OBJECT_CLASS (klass);
228
229   gobject_class->get_property = gst_rtsp_stream_get_property;
230   gobject_class->set_property = gst_rtsp_stream_set_property;
231   gobject_class->finalize = gst_rtsp_stream_finalize;
232
233   g_object_class_install_property (gobject_class, PROP_CONTROL,
234       g_param_spec_string ("control", "Control",
235           "The control string for this stream", DEFAULT_CONTROL,
236           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237
238   g_object_class_install_property (gobject_class, PROP_PROFILES,
239       g_param_spec_flags ("profiles", "Profiles",
240           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
241           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
242
243   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
244       g_param_spec_flags ("protocols", "Protocols",
245           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
246           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
247
248   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
249       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
250       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
251       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
252
253   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
254       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
255       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
256       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
257
258   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_RTCP_DECODER] =
259       g_signal_new ("new-rtp-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
260       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
261       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
262
263   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
264
265   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
266 }
267
268 static void
269 gst_rtsp_stream_init (GstRTSPStream * stream)
270 {
271   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
272
273   GST_DEBUG ("new stream %p", stream);
274
275   stream->priv = priv;
276
277   priv->dscp_qos = -1;
278   priv->control = g_strdup (DEFAULT_CONTROL);
279   priv->profiles = DEFAULT_PROFILES;
280   priv->allowed_protocols = DEFAULT_PROTOCOLS;
281   priv->configured_protocols = 0;
282   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
283
284   g_mutex_init (&priv->lock);
285
286   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
287       NULL, (GDestroyNotify) gst_caps_unref);
288   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
289       (GDestroyNotify) gst_caps_unref);
290 }
291
292 static void
293 gst_rtsp_stream_finalize (GObject * obj)
294 {
295   GstRTSPStream *stream;
296   GstRTSPStreamPrivate *priv;
297   guint i;
298
299   stream = GST_RTSP_STREAM (obj);
300   priv = stream->priv;
301
302   GST_DEBUG ("finalize stream %p", stream);
303
304   /* we really need to be unjoined now */
305   g_return_if_fail (priv->joined_bin == NULL);
306
307   if (priv->mcast_addr_v4)
308     gst_rtsp_address_free (priv->mcast_addr_v4);
309   if (priv->mcast_addr_v6)
310     gst_rtsp_address_free (priv->mcast_addr_v6);
311   if (priv->server_addr_v4)
312     gst_rtsp_address_free (priv->server_addr_v4);
313   if (priv->server_addr_v6)
314     gst_rtsp_address_free (priv->server_addr_v6);
315   if (priv->pool)
316     g_object_unref (priv->pool);
317   if (priv->rtxsend)
318     g_object_unref (priv->rtxsend);
319   if (priv->rtxreceive)
320     g_object_unref (priv->rtxreceive);
321   if (priv->ulpfec_encoder)
322     gst_object_unref (priv->ulpfec_encoder);
323   if (priv->ulpfec_decoder)
324     gst_object_unref (priv->ulpfec_decoder);
325
326   for (i = 0; i < 2; i++) {
327     if (priv->socket_v4[i])
328       g_object_unref (priv->socket_v4[i]);
329     if (priv->socket_v6[i])
330       g_object_unref (priv->socket_v6[i]);
331     if (priv->mcast_socket_v4[i])
332       g_object_unref (priv->mcast_socket_v4[i]);
333     if (priv->mcast_socket_v6[i])
334       g_object_unref (priv->mcast_socket_v6[i]);
335   }
336
337   g_free (priv->multicast_iface);
338
339   gst_object_unref (priv->payloader);
340   if (priv->srcpad)
341     gst_object_unref (priv->srcpad);
342   if (priv->sinkpad)
343     gst_object_unref (priv->sinkpad);
344   g_free (priv->control);
345   g_mutex_clear (&priv->lock);
346
347   g_hash_table_unref (priv->keys);
348   g_hash_table_destroy (priv->ptmap);
349
350   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
351 }
352
353 static void
354 gst_rtsp_stream_get_property (GObject * object, guint propid,
355     GValue * value, GParamSpec * pspec)
356 {
357   GstRTSPStream *stream = GST_RTSP_STREAM (object);
358
359   switch (propid) {
360     case PROP_CONTROL:
361       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
362       break;
363     case PROP_PROFILES:
364       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
365       break;
366     case PROP_PROTOCOLS:
367       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
368       break;
369     default:
370       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
371   }
372 }
373
374 static void
375 gst_rtsp_stream_set_property (GObject * object, guint propid,
376     const GValue * value, GParamSpec * pspec)
377 {
378   GstRTSPStream *stream = GST_RTSP_STREAM (object);
379
380   switch (propid) {
381     case PROP_CONTROL:
382       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
383       break;
384     case PROP_PROFILES:
385       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
386       break;
387     case PROP_PROTOCOLS:
388       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
389       break;
390     default:
391       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
392   }
393 }
394
395 /**
396  * gst_rtsp_stream_new:
397  * @idx: an index
398  * @pad: a #GstPad
399  * @payloader: a #GstElement
400  *
401  * Create a new media stream with index @idx that handles RTP data on
402  * @pad and has a payloader element @payloader if @pad is a source pad
403  * or a depayloader element @payloader if @pad is a sink pad.
404  *
405  * Returns: (transfer full): a new #GstRTSPStream
406  */
407 GstRTSPStream *
408 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
409 {
410   GstRTSPStreamPrivate *priv;
411   GstRTSPStream *stream;
412
413   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
414   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
415
416   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
417   priv = stream->priv;
418   priv->idx = idx;
419   priv->payloader = gst_object_ref (payloader);
420   if (GST_PAD_IS_SRC (pad))
421     priv->srcpad = gst_object_ref (pad);
422   else
423     priv->sinkpad = gst_object_ref (pad);
424
425   return stream;
426 }
427
428 /**
429  * gst_rtsp_stream_get_index:
430  * @stream: a #GstRTSPStream
431  *
432  * Get the stream index.
433  *
434  * Return: the stream index.
435  */
436 guint
437 gst_rtsp_stream_get_index (GstRTSPStream * stream)
438 {
439   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
440
441   return stream->priv->idx;
442 }
443
444 /**
445  * gst_rtsp_stream_get_pt:
446  * @stream: a #GstRTSPStream
447  *
448  * Get the stream payload type.
449  *
450  * Return: the stream payload type.
451  */
452 guint
453 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
454 {
455   GstRTSPStreamPrivate *priv;
456   guint pt;
457
458   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
459
460   priv = stream->priv;
461
462   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
463
464   return pt;
465 }
466
467 /**
468  * gst_rtsp_stream_get_srcpad:
469  * @stream: a #GstRTSPStream
470  *
471  * Get the srcpad associated with @stream.
472  *
473  * Returns: (transfer full) (nullable): the srcpad. Unref after usage.
474  */
475 GstPad *
476 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
477 {
478   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
479
480   if (!stream->priv->srcpad)
481     return NULL;
482
483   return gst_object_ref (stream->priv->srcpad);
484 }
485
486 /**
487  * gst_rtsp_stream_get_sinkpad:
488  * @stream: a #GstRTSPStream
489  *
490  * Get the sinkpad associated with @stream.
491  *
492  * Returns: (transfer full) (nullable): the sinkpad. Unref after usage.
493  */
494 GstPad *
495 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
496 {
497   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
498
499   if (!stream->priv->sinkpad)
500     return NULL;
501
502   return gst_object_ref (stream->priv->sinkpad);
503 }
504
505 /**
506  * gst_rtsp_stream_get_control:
507  * @stream: a #GstRTSPStream
508  *
509  * Get the control string to identify this stream.
510  *
511  * Returns: (transfer full) (nullable): the control string. g_free() after usage.
512  */
513 gchar *
514 gst_rtsp_stream_get_control (GstRTSPStream * stream)
515 {
516   GstRTSPStreamPrivate *priv;
517   gchar *result;
518
519   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
520
521   priv = stream->priv;
522
523   g_mutex_lock (&priv->lock);
524   if ((result = g_strdup (priv->control)) == NULL)
525     result = g_strdup_printf ("stream=%u", priv->idx);
526   g_mutex_unlock (&priv->lock);
527
528   return result;
529 }
530
531 /**
532  * gst_rtsp_stream_set_control:
533  * @stream: a #GstRTSPStream
534  * @control: (nullable): a control string
535  *
536  * Set the control string in @stream.
537  */
538 void
539 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
540 {
541   GstRTSPStreamPrivate *priv;
542
543   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
544
545   priv = stream->priv;
546
547   g_mutex_lock (&priv->lock);
548   g_free (priv->control);
549   priv->control = g_strdup (control);
550   g_mutex_unlock (&priv->lock);
551 }
552
553 /**
554  * gst_rtsp_stream_has_control:
555  * @stream: a #GstRTSPStream
556  * @control: (nullable): a control string
557  *
558  * Check if @stream has the control string @control.
559  *
560  * Returns: %TRUE is @stream has @control as the control string
561  */
562 gboolean
563 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
564 {
565   GstRTSPStreamPrivate *priv;
566   gboolean res;
567
568   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
569
570   priv = stream->priv;
571
572   g_mutex_lock (&priv->lock);
573   if (priv->control)
574     res = (g_strcmp0 (priv->control, control) == 0);
575   else {
576     guint streamid;
577
578     if (sscanf (control, "stream=%u", &streamid) > 0)
579       res = (streamid == priv->idx);
580     else
581       res = FALSE;
582   }
583   g_mutex_unlock (&priv->lock);
584
585   return res;
586 }
587
588 /**
589  * gst_rtsp_stream_set_mtu:
590  * @stream: a #GstRTSPStream
591  * @mtu: a new MTU
592  *
593  * Configure the mtu in the payloader of @stream to @mtu.
594  */
595 void
596 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
597 {
598   GstRTSPStreamPrivate *priv;
599
600   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
601
602   priv = stream->priv;
603
604   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
605
606   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
607 }
608
609 /**
610  * gst_rtsp_stream_get_mtu:
611  * @stream: a #GstRTSPStream
612  *
613  * Get the configured MTU in the payloader of @stream.
614  *
615  * Returns: the MTU of the payloader.
616  */
617 guint
618 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
619 {
620   GstRTSPStreamPrivate *priv;
621   guint mtu;
622
623   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
624
625   priv = stream->priv;
626
627   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
628
629   return mtu;
630 }
631
632 /* Update the dscp qos property on the udp sinks */
633 static void
634 update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink)
635 {
636   GstRTSPStreamPrivate *priv;
637
638   priv = stream->priv;
639
640   if (*udpsink) {
641     g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL);
642   }
643 }
644
645 /**
646  * gst_rtsp_stream_set_dscp_qos:
647  * @stream: a #GstRTSPStream
648  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
649  *
650  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
651  */
652 void
653 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
654 {
655   GstRTSPStreamPrivate *priv;
656
657   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
658
659   priv = stream->priv;
660
661   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
662
663   if (dscp_qos < -1 || dscp_qos > 63) {
664     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
665     return;
666   }
667
668   priv->dscp_qos = dscp_qos;
669
670   update_dscp_qos (stream, priv->udpsink);
671 }
672
673 /**
674  * gst_rtsp_stream_get_dscp_qos:
675  * @stream: a #GstRTSPStream
676  *
677  * Get the configured DSCP QoS in of the outgoing sockets.
678  *
679  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
680  */
681 gint
682 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
683 {
684   GstRTSPStreamPrivate *priv;
685
686   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
687
688   priv = stream->priv;
689
690   return priv->dscp_qos;
691 }
692
693 /**
694  * gst_rtsp_stream_is_transport_supported:
695  * @stream: a #GstRTSPStream
696  * @transport: (transfer none): a #GstRTSPTransport
697  *
698  * Check if @transport can be handled by stream
699  *
700  * Returns: %TRUE if @transport can be handled by @stream.
701  */
702 gboolean
703 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
704     GstRTSPTransport * transport)
705 {
706   GstRTSPStreamPrivate *priv;
707
708   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
709   g_return_val_if_fail (transport != NULL, FALSE);
710
711   priv = stream->priv;
712
713   g_mutex_lock (&priv->lock);
714   if (transport->trans != GST_RTSP_TRANS_RTP)
715     goto unsupported_transmode;
716
717   if (!(transport->profile & priv->profiles))
718     goto unsupported_profile;
719
720   if (!(transport->lower_transport & priv->allowed_protocols))
721     goto unsupported_ltrans;
722
723   g_mutex_unlock (&priv->lock);
724
725   return TRUE;
726
727   /* ERRORS */
728 unsupported_transmode:
729   {
730     GST_DEBUG ("unsupported transport mode %d", transport->trans);
731     g_mutex_unlock (&priv->lock);
732     return FALSE;
733   }
734 unsupported_profile:
735   {
736     GST_DEBUG ("unsupported profile %d", transport->profile);
737     g_mutex_unlock (&priv->lock);
738     return FALSE;
739   }
740 unsupported_ltrans:
741   {
742     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
743     g_mutex_unlock (&priv->lock);
744     return FALSE;
745   }
746 }
747
748 /**
749  * gst_rtsp_stream_set_profiles:
750  * @stream: a #GstRTSPStream
751  * @profiles: the new profiles
752  *
753  * Configure the allowed profiles for @stream.
754  */
755 void
756 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
757 {
758   GstRTSPStreamPrivate *priv;
759
760   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
761
762   priv = stream->priv;
763
764   g_mutex_lock (&priv->lock);
765   priv->profiles = profiles;
766   g_mutex_unlock (&priv->lock);
767 }
768
769 /**
770  * gst_rtsp_stream_get_profiles:
771  * @stream: a #GstRTSPStream
772  *
773  * Get the allowed profiles of @stream.
774  *
775  * Returns: a #GstRTSPProfile
776  */
777 GstRTSPProfile
778 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
779 {
780   GstRTSPStreamPrivate *priv;
781   GstRTSPProfile res;
782
783   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
784
785   priv = stream->priv;
786
787   g_mutex_lock (&priv->lock);
788   res = priv->profiles;
789   g_mutex_unlock (&priv->lock);
790
791   return res;
792 }
793
794 /**
795  * gst_rtsp_stream_set_protocols:
796  * @stream: a #GstRTSPStream
797  * @protocols: the new flags
798  *
799  * Configure the allowed lower transport for @stream.
800  */
801 void
802 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
803     GstRTSPLowerTrans protocols)
804 {
805   GstRTSPStreamPrivate *priv;
806
807   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
808
809   priv = stream->priv;
810
811   g_mutex_lock (&priv->lock);
812   priv->allowed_protocols = protocols;
813   g_mutex_unlock (&priv->lock);
814 }
815
816 /**
817  * gst_rtsp_stream_get_protocols:
818  * @stream: a #GstRTSPStream
819  *
820  * Get the allowed protocols of @stream.
821  *
822  * Returns: a #GstRTSPLowerTrans
823  */
824 GstRTSPLowerTrans
825 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
826 {
827   GstRTSPStreamPrivate *priv;
828   GstRTSPLowerTrans res;
829
830   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
831       GST_RTSP_LOWER_TRANS_UNKNOWN);
832
833   priv = stream->priv;
834
835   g_mutex_lock (&priv->lock);
836   res = priv->allowed_protocols;
837   g_mutex_unlock (&priv->lock);
838
839   return res;
840 }
841
842 /**
843  * gst_rtsp_stream_set_address_pool:
844  * @stream: a #GstRTSPStream
845  * @pool: (transfer none) (nullable): a #GstRTSPAddressPool
846  *
847  * configure @pool to be used as the address pool of @stream.
848  */
849 void
850 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
851     GstRTSPAddressPool * pool)
852 {
853   GstRTSPStreamPrivate *priv;
854   GstRTSPAddressPool *old;
855
856   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
857
858   priv = stream->priv;
859
860   GST_LOG_OBJECT (stream, "set address pool %p", pool);
861
862   g_mutex_lock (&priv->lock);
863   if ((old = priv->pool) != pool)
864     priv->pool = pool ? g_object_ref (pool) : NULL;
865   else
866     old = NULL;
867   g_mutex_unlock (&priv->lock);
868
869   if (old)
870     g_object_unref (old);
871 }
872
873 /**
874  * gst_rtsp_stream_get_address_pool:
875  * @stream: a #GstRTSPStream
876  *
877  * Get the #GstRTSPAddressPool used as the address pool of @stream.
878  *
879  * Returns: (transfer full) (nullable): the #GstRTSPAddressPool of @stream.
880  * g_object_unref() after usage.
881  */
882 GstRTSPAddressPool *
883 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
884 {
885   GstRTSPStreamPrivate *priv;
886   GstRTSPAddressPool *result;
887
888   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
889
890   priv = stream->priv;
891
892   g_mutex_lock (&priv->lock);
893   if ((result = priv->pool))
894     g_object_ref (result);
895   g_mutex_unlock (&priv->lock);
896
897   return result;
898 }
899
900 /**
901  * gst_rtsp_stream_set_multicast_iface:
902  * @stream: a #GstRTSPStream
903  * @multicast_iface: (transfer none) (nullable): a multicast interface name
904  *
905  * configure @multicast_iface to be used for @stream.
906  */
907 void
908 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
909     const gchar * multicast_iface)
910 {
911   GstRTSPStreamPrivate *priv;
912   gchar *old;
913
914   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
915
916   priv = stream->priv;
917
918   GST_LOG_OBJECT (stream, "set multicast iface %s",
919       GST_STR_NULL (multicast_iface));
920
921   g_mutex_lock (&priv->lock);
922   if ((old = priv->multicast_iface) != multicast_iface)
923     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
924   else
925     old = NULL;
926   g_mutex_unlock (&priv->lock);
927
928   if (old)
929     g_free (old);
930 }
931
932 /**
933  * gst_rtsp_stream_get_multicast_iface:
934  * @stream: a #GstRTSPStream
935  *
936  * Get the multicast interface used for @stream.
937  *
938  * Returns: (transfer full) (nullable): the multicast interface for @stream.
939  * g_free() after usage.
940  */
941 gchar *
942 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
943 {
944   GstRTSPStreamPrivate *priv;
945   gchar *result;
946
947   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
948
949   priv = stream->priv;
950
951   g_mutex_lock (&priv->lock);
952   if ((result = priv->multicast_iface))
953     result = g_strdup (result);
954   g_mutex_unlock (&priv->lock);
955
956   return result;
957 }
958
959 /**
960  * gst_rtsp_stream_get_multicast_address:
961  * @stream: a #GstRTSPStream
962  * @family: the #GSocketFamily
963  *
964  * Get the multicast address of @stream for @family. The original
965  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
966  * won't release the address from the pool.
967  *
968  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
969  * or %NULL when no address could be allocated. gst_rtsp_address_free()
970  * after usage.
971  */
972 GstRTSPAddress *
973 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
974     GSocketFamily family)
975 {
976   GstRTSPStreamPrivate *priv;
977   GstRTSPAddress *result;
978   GstRTSPAddress **addrp;
979   GstRTSPAddressFlags flags;
980
981   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
982
983   priv = stream->priv;
984
985   g_mutex_lock (&stream->priv->lock);
986
987   if (family == G_SOCKET_FAMILY_IPV6) {
988     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
989     addrp = &priv->mcast_addr_v6;
990   } else {
991     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
992     addrp = &priv->mcast_addr_v4;
993   }
994
995   if (*addrp == NULL) {
996     if (priv->pool == NULL)
997       goto no_pool;
998
999     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
1000
1001     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
1002     if (*addrp == NULL)
1003       goto no_address;
1004
1005     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1006      * where we are going to bind our socket. Probably loop until we find a port
1007      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
1008      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
1009      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
1010   }
1011   result = gst_rtsp_address_copy (*addrp);
1012
1013   g_mutex_unlock (&stream->priv->lock);
1014
1015   return result;
1016
1017   /* ERRORS */
1018 no_pool:
1019   {
1020     GST_ERROR_OBJECT (stream, "no address pool specified");
1021     g_mutex_unlock (&stream->priv->lock);
1022     return NULL;
1023   }
1024 no_address:
1025   {
1026     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1027     g_mutex_unlock (&stream->priv->lock);
1028     return NULL;
1029   }
1030 }
1031
1032 /**
1033  * gst_rtsp_stream_reserve_address:
1034  * @stream: a #GstRTSPStream
1035  * @address: an address
1036  * @port: a port
1037  * @n_ports: n_ports
1038  * @ttl: a TTL
1039  *
1040  * Reserve @address and @port as the address and port of @stream. The original
1041  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1042  * won't release the address from the pool.
1043  *
1044  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1045  * the address could be reserved. gst_rtsp_address_free() after usage.
1046  */
1047 GstRTSPAddress *
1048 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1049     const gchar * address, guint port, guint n_ports, guint ttl)
1050 {
1051   GstRTSPStreamPrivate *priv;
1052   GstRTSPAddress *result;
1053   GInetAddress *addr;
1054   GSocketFamily family;
1055   GstRTSPAddress **addrp;
1056
1057   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1058   g_return_val_if_fail (address != NULL, NULL);
1059   g_return_val_if_fail (port > 0, NULL);
1060   g_return_val_if_fail (n_ports > 0, NULL);
1061   g_return_val_if_fail (ttl > 0, NULL);
1062
1063   priv = stream->priv;
1064
1065   addr = g_inet_address_new_from_string (address);
1066   if (!addr) {
1067     GST_ERROR ("failed to get inet addr from %s", address);
1068     family = G_SOCKET_FAMILY_IPV4;
1069   } else {
1070     family = g_inet_address_get_family (addr);
1071     g_object_unref (addr);
1072   }
1073
1074   if (family == G_SOCKET_FAMILY_IPV6)
1075     addrp = &priv->mcast_addr_v6;
1076   else
1077     addrp = &priv->mcast_addr_v4;
1078
1079   g_mutex_lock (&priv->lock);
1080   if (*addrp == NULL) {
1081     GstRTSPAddressPoolResult res;
1082
1083     if (priv->pool == NULL)
1084       goto no_pool;
1085
1086     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1087         port, n_ports, ttl, addrp);
1088     if (res != GST_RTSP_ADDRESS_POOL_OK)
1089       goto no_address;
1090
1091     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1092      * where we are going to bind our socket. */
1093   } else {
1094     if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1095         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1096         (*addrp)->ttl != ttl)
1097       goto different_address;
1098   }
1099   result = gst_rtsp_address_copy (*addrp);
1100   g_mutex_unlock (&priv->lock);
1101
1102   return result;
1103
1104   /* ERRORS */
1105 no_pool:
1106   {
1107     GST_ERROR_OBJECT (stream, "no address pool specified");
1108     g_mutex_unlock (&priv->lock);
1109     return NULL;
1110   }
1111 no_address:
1112   {
1113     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1114         address);
1115     g_mutex_unlock (&priv->lock);
1116     return NULL;
1117   }
1118 different_address:
1119   {
1120     GST_ERROR_OBJECT (stream,
1121         "address %s is not the same as %s that was already reserved",
1122         address, (*addrp)->address);
1123     g_mutex_unlock (&priv->lock);
1124     return NULL;
1125   }
1126 }
1127
1128 /* must be called with lock */
1129 static void
1130 set_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1131     GSocketFamily family)
1132 {
1133   const gchar *multisink_socket;
1134
1135   if (family == G_SOCKET_FAMILY_IPV6)
1136     multisink_socket = "socket-v6";
1137   else
1138     multisink_socket = "socket";
1139
1140   g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL);
1141 }
1142
1143 /* must be called with lock */
1144 static void
1145 set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1146     GSocketFamily family, const gchar * multicast_iface,
1147     const gchar * addr_str, gint port, gint mcast_ttl)
1148 {
1149   set_socket_for_udpsink (udpsink, socket, family);
1150
1151   if (multicast_iface) {
1152     GST_INFO ("setting multicast-iface %s", multicast_iface);
1153     g_object_set (G_OBJECT (udpsink), "multicast-iface", multicast_iface, NULL);
1154   }
1155
1156   if (mcast_ttl > 0) {
1157     GST_INFO ("setting ttl-mc %d", mcast_ttl);
1158     g_object_set (G_OBJECT (udpsink), "ttl-mc", mcast_ttl, NULL);
1159   }
1160 }
1161
1162
1163 /* must be called with lock */
1164 static void
1165 set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1166     GSocketFamily family)
1167 {
1168   set_socket_for_udpsink (udpsink, socket, family);
1169 }
1170
1171 static guint16
1172 get_port_from_socket (GSocket * socket)
1173 {
1174   guint16 port;
1175   GSocketAddress *sockaddr;
1176   GError *err;
1177
1178   GST_DEBUG ("socket: %p", socket);
1179   sockaddr = g_socket_get_local_address (socket, &err);
1180   if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) {
1181     g_clear_object (&sockaddr);
1182     GST_ERROR ("failed to get sockaddr: %s", err->message);
1183     g_error_free (err);
1184     return 0;
1185   }
1186
1187   port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr));
1188   g_object_unref (sockaddr);
1189
1190   return port;
1191 }
1192
1193
1194 static gboolean
1195 create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink,
1196     GSocket * socket_v4, GSocket * socket_v6, gboolean multicast,
1197     gboolean is_rtp, gint mcast_ttl)
1198 {
1199   GstRTSPStreamPrivate *priv = stream->priv;
1200
1201   *udpsink = gst_element_factory_make ("multiudpsink", NULL);
1202
1203   if (!*udpsink)
1204     goto no_udp_protocol;
1205
1206   /* configure sinks */
1207
1208   g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL);
1209
1210   g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL);
1211
1212   if (is_rtp)
1213     g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL);
1214   else
1215     g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL);
1216
1217   /* Needs to be async for RECORD streams, otherwise we will never go to
1218    * PLAYING because the sinks will wait for data while the udpsrc can't
1219    * provide data with timestamps in PAUSED. */
1220   if (!is_rtp || priv->sinkpad)
1221     g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL);
1222
1223   if (multicast) {
1224     /* join multicast group when adding clients, so we'll start receiving from it.
1225      * We cannot rely on the udpsrc to join the group since its socket is always a
1226      * local unicast one. */
1227     g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL);
1228
1229     g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL);
1230   }
1231
1232   /* update the dscp qos field in the sinks */
1233   update_dscp_qos (stream, udpsink);
1234
1235   if (priv->server_addr_v4) {
1236     GST_DEBUG_OBJECT (stream, "udp IPv4, configure udpsinks");
1237     set_unicast_socket_for_udpsink (*udpsink, socket_v4, G_SOCKET_FAMILY_IPV4);
1238   }
1239
1240   if (priv->server_addr_v6) {
1241     GST_DEBUG_OBJECT (stream, "udp IPv6, configure udpsinks");
1242     set_unicast_socket_for_udpsink (*udpsink, socket_v6, G_SOCKET_FAMILY_IPV6);
1243   }
1244
1245   if (multicast) {
1246     gint port;
1247     if (priv->mcast_addr_v4) {
1248       GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks");
1249       port = get_port_from_socket (socket_v4);
1250       if (!port)
1251         goto get_port_failed;
1252       set_multicast_socket_for_udpsink (*udpsink, socket_v4,
1253           G_SOCKET_FAMILY_IPV4, priv->multicast_iface,
1254           priv->mcast_addr_v4->address, port, mcast_ttl);
1255     }
1256
1257     if (priv->mcast_addr_v6) {
1258       GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks");
1259       port = get_port_from_socket (socket_v6);
1260       if (!port)
1261         goto get_port_failed;
1262       set_multicast_socket_for_udpsink (*udpsink, socket_v6,
1263           G_SOCKET_FAMILY_IPV6, priv->multicast_iface,
1264           priv->mcast_addr_v6->address, port, mcast_ttl);
1265     }
1266
1267   }
1268
1269   return TRUE;
1270
1271   /* ERRORS */
1272 no_udp_protocol:
1273   {
1274     GST_ERROR_OBJECT (stream, "failed to create udpsink element");
1275     return FALSE;
1276   }
1277 get_port_failed:
1278   {
1279     GST_ERROR_OBJECT (stream, "failed to get udp port");
1280     return FALSE;
1281   }
1282 }
1283
1284 /* must be called with lock */
1285 static gboolean
1286 create_and_configure_udpsource (GstElement ** udpsrc, GSocket * socket)
1287 {
1288   GstStateChangeReturn ret;
1289
1290   g_assert (socket != NULL);
1291
1292   *udpsrc = gst_element_factory_make ("udpsrc", NULL);
1293   if (*udpsrc == NULL)
1294     goto error;
1295
1296   g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL);
1297
1298   /* The udpsrc cannot do the join because its socket is always a local unicast
1299    * one. The udpsink sharing the same socket will do it for us. */
1300   g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL);
1301
1302   g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL);
1303
1304   g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL);
1305
1306   ret = gst_element_set_state (*udpsrc, GST_STATE_READY);
1307   if (ret == GST_STATE_CHANGE_FAILURE)
1308     goto error;
1309
1310   return TRUE;
1311
1312   /* ERRORS */
1313 error:
1314   {
1315     if (*udpsrc) {
1316       gst_element_set_state (*udpsrc, GST_STATE_NULL);
1317       g_clear_object (udpsrc);
1318     }
1319     return FALSE;
1320   }
1321 }
1322
1323 static gboolean
1324 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1325     GSocket * socket_out[2], GstRTSPAddress ** server_addr_out,
1326     gboolean multicast, GstRTSPTransport * ct)
1327 {
1328   GstRTSPStreamPrivate *priv = stream->priv;
1329   GSocket *rtp_socket = NULL;
1330   GSocket *rtcp_socket;
1331   gint tmp_rtp, tmp_rtcp;
1332   guint count;
1333   GList *rejected_addresses = NULL;
1334   GstRTSPAddress *addr = NULL;
1335   GInetAddress *inetaddr = NULL;
1336   GSocketAddress *rtp_sockaddr = NULL;
1337   GSocketAddress *rtcp_sockaddr = NULL;
1338   GstRTSPAddressPool *pool;
1339
1340   pool = priv->pool;
1341   count = 0;
1342
1343   /* Start with random port */
1344   tmp_rtp = 0;
1345
1346   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1347       G_SOCKET_PROTOCOL_UDP, NULL);
1348   if (!rtcp_socket)
1349     goto no_udp_protocol;
1350   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1351
1352   /* try to allocate 2 UDP ports, the RTP port should be an even
1353    * number and the RTCP port should be the next (uneven) port */
1354 again:
1355
1356   if (rtp_socket == NULL) {
1357     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1358         G_SOCKET_PROTOCOL_UDP, NULL);
1359     if (!rtp_socket)
1360       goto no_udp_protocol;
1361     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1362   }
1363
1364   if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
1365     GstRTSPAddressFlags flags;
1366
1367     if (addr)
1368       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1369
1370     if (!pool)
1371       goto no_pool;
1372
1373     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1374     if (multicast)
1375       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1376     else
1377       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1378
1379     if (family == G_SOCKET_FAMILY_IPV6)
1380       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1381     else
1382       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1383
1384     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1385
1386     if (addr == NULL)
1387       goto no_address;
1388
1389     tmp_rtp = addr->port;
1390
1391     g_clear_object (&inetaddr);
1392     /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and
1393      * socket control message set in udpsrc? */
1394     if (multicast)
1395       inetaddr = g_inet_address_new_any (family);
1396     else
1397       inetaddr = g_inet_address_new_from_string (addr->address);
1398   } else {
1399     if (tmp_rtp != 0) {
1400       tmp_rtp += 2;
1401       if (++count > 20)
1402         goto no_ports;
1403     }
1404
1405     if (inetaddr == NULL)
1406       inetaddr = g_inet_address_new_any (family);
1407   }
1408
1409   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1410   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1411     GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again");
1412     g_object_unref (rtp_sockaddr);
1413     goto again;
1414   }
1415   g_object_unref (rtp_sockaddr);
1416
1417   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1418   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1419     g_clear_object (&rtp_sockaddr);
1420     goto socket_error;
1421   }
1422
1423   tmp_rtp =
1424       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1425   g_object_unref (rtp_sockaddr);
1426
1427   /* check if port is even */
1428   if ((tmp_rtp & 1) != 0) {
1429     /* port not even, close and allocate another */
1430     tmp_rtp++;
1431     g_clear_object (&rtp_socket);
1432     goto again;
1433   }
1434
1435   /* set port */
1436   tmp_rtcp = tmp_rtp + 1;
1437
1438   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1439   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1440     GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again");
1441     g_object_unref (rtcp_sockaddr);
1442     g_clear_object (&rtp_socket);
1443     goto again;
1444   }
1445   g_object_unref (rtcp_sockaddr);
1446
1447   if (!addr) {
1448     addr = g_slice_new0 (GstRTSPAddress);
1449     addr->address = g_inet_address_to_string (inetaddr);
1450     addr->port = tmp_rtp;
1451     addr->n_ports = 2;
1452   }
1453
1454   g_clear_object (&inetaddr);
1455
1456   socket_out[0] = rtp_socket;
1457   socket_out[1] = rtcp_socket;
1458   *server_addr_out = addr;
1459
1460   GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d",
1461       addr->address, tmp_rtp, tmp_rtcp);
1462
1463   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1464
1465   return TRUE;
1466
1467   /* ERRORS */
1468 no_udp_protocol:
1469   {
1470     GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: protocol error");
1471     goto cleanup;
1472   }
1473 no_pool:
1474   {
1475     GST_WARNING_OBJECT (stream,
1476         "failed to allocate UDP ports: no address pool specified");
1477     goto cleanup;
1478   }
1479 no_address:
1480   {
1481     GST_WARNING_OBJECT (stream, "failed to acquire address from pool");
1482     goto cleanup;
1483   }
1484 no_ports:
1485   {
1486     GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: no ports");
1487     goto cleanup;
1488   }
1489 socket_error:
1490   {
1491     GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: socket error");
1492     goto cleanup;
1493   }
1494 cleanup:
1495   {
1496     if (inetaddr)
1497       g_object_unref (inetaddr);
1498     g_list_free_full (rejected_addresses,
1499         (GDestroyNotify) gst_rtsp_address_free);
1500     if (addr)
1501       gst_rtsp_address_free (addr);
1502     if (rtp_socket)
1503       g_object_unref (rtp_socket);
1504     if (rtcp_socket)
1505       g_object_unref (rtcp_socket);
1506     return FALSE;
1507   }
1508 }
1509
1510 /**
1511  * gst_rtsp_stream_allocate_udp_sockets:
1512  * @stream: a #GstRTSPStream
1513  * @family: protocol family
1514  * @transport: transport method
1515  * @use_client_settings: Whether to use client settings or not
1516  *
1517  * Allocates RTP and RTCP ports.
1518  *
1519  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1520  */
1521 gboolean
1522 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1523     GSocketFamily family, GstRTSPTransport * ct,
1524     gboolean use_transport_settings)
1525 {
1526   GstRTSPStreamPrivate *priv;
1527   gboolean ret = FALSE;
1528   GstRTSPLowerTrans transport;
1529   gboolean allocated = FALSE;
1530
1531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1532   g_return_val_if_fail (ct != NULL, FALSE);
1533   priv = stream->priv;
1534
1535   transport = ct->lower_transport;
1536
1537   g_mutex_lock (&priv->lock);
1538
1539   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1540     if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_socket_v4[0])
1541       allocated = TRUE;
1542     else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_socket_v6[0])
1543       allocated = TRUE;
1544   } else if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1545     if (family == G_SOCKET_FAMILY_IPV4 && priv->socket_v4[0])
1546       allocated = TRUE;
1547     else if (family == G_SOCKET_FAMILY_IPV6 && priv->socket_v6[0])
1548       allocated = TRUE;
1549   }
1550
1551   if (allocated) {
1552     GST_DEBUG_OBJECT (stream, "Allocated already");
1553     g_mutex_unlock (&priv->lock);
1554     return TRUE;
1555   }
1556
1557   if (family == G_SOCKET_FAMILY_IPV4) {
1558     /* IPv4 */
1559     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1560       /* UDP unicast */
1561       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4");
1562       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1563           priv->socket_v4, &priv->server_addr_v4, FALSE, ct);
1564     } else {
1565       /* multicast */
1566       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4");
1567       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1568           priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct);
1569     }
1570   } else {
1571     /* IPv6 */
1572     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1573       /* unicast */
1574       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6");
1575       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1576           priv->socket_v6, &priv->server_addr_v6, FALSE, ct);
1577
1578     } else {
1579       /* multicast */
1580       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6");
1581       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1582           priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct);
1583     }
1584   }
1585   g_mutex_unlock (&priv->lock);
1586
1587   return ret;
1588 }
1589
1590 /**
1591  * gst_rtsp_stream_set_client_side:
1592  * @stream: a #GstRTSPStream
1593  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1594  * an RTSP connection.
1595  *
1596  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1597  * streams to an RTSP server via RECORD. This has the practical effect
1598  * of changing which UDP port numbers are used when setting up the local
1599  * side of the stream sending to be either the 'server' or 'client' pair
1600  * of a configured UDP transport.
1601  */
1602 void
1603 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1604 {
1605   GstRTSPStreamPrivate *priv;
1606
1607   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1608   priv = stream->priv;
1609   g_mutex_lock (&priv->lock);
1610   priv->client_side = client_side;
1611   g_mutex_unlock (&priv->lock);
1612 }
1613
1614 /**
1615  * gst_rtsp_stream_is_client_side:
1616  * @stream: a #GstRTSPStream
1617  *
1618  * See gst_rtsp_stream_set_client_side()
1619  *
1620  * Returns: TRUE if this #GstRTSPStream is client-side.
1621  */
1622 gboolean
1623 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1624 {
1625   GstRTSPStreamPrivate *priv;
1626   gboolean ret;
1627
1628   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1629
1630   priv = stream->priv;
1631   g_mutex_lock (&priv->lock);
1632   ret = priv->client_side;
1633   g_mutex_unlock (&priv->lock);
1634
1635   return ret;
1636 }
1637
1638 /**
1639  * gst_rtsp_stream_get_server_port:
1640  * @stream: a #GstRTSPStream
1641  * @server_port: (out): result server port
1642  * @family: the port family to get
1643  *
1644  * Fill @server_port with the port pair used by the server. This function can
1645  * only be called when @stream has been joined.
1646  */
1647 void
1648 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1649     GstRTSPRange * server_port, GSocketFamily family)
1650 {
1651   GstRTSPStreamPrivate *priv;
1652
1653   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1654   priv = stream->priv;
1655   g_return_if_fail (priv->joined_bin != NULL);
1656
1657   if (server_port) {
1658     server_port->min = 0;
1659     server_port->max = 0;
1660   }
1661
1662   g_mutex_lock (&priv->lock);
1663   if (family == G_SOCKET_FAMILY_IPV4) {
1664     if (server_port && priv->server_addr_v4) {
1665       server_port->min = priv->server_addr_v4->port;
1666       server_port->max =
1667           priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1668     }
1669   } else {
1670     if (server_port && priv->server_addr_v6) {
1671       server_port->min = priv->server_addr_v6->port;
1672       server_port->max =
1673           priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1674     }
1675   }
1676   g_mutex_unlock (&priv->lock);
1677 }
1678
1679 /**
1680  * gst_rtsp_stream_get_rtpsession:
1681  * @stream: a #GstRTSPStream
1682  *
1683  * Get the RTP session of this stream.
1684  *
1685  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1686  */
1687 GObject *
1688 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1689 {
1690   GstRTSPStreamPrivate *priv;
1691   GObject *session;
1692
1693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1694
1695   priv = stream->priv;
1696
1697   g_mutex_lock (&priv->lock);
1698   if ((session = priv->session))
1699     g_object_ref (session);
1700   g_mutex_unlock (&priv->lock);
1701
1702   return session;
1703 }
1704
1705 /**
1706  * gst_rtsp_stream_get_srtp_encoder:
1707  * @stream: a #GstRTSPStream
1708  *
1709  * Get the SRTP encoder for this stream.
1710  *
1711  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1712  */
1713 GstElement *
1714 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1715 {
1716   GstRTSPStreamPrivate *priv;
1717   GstElement *encoder;
1718
1719   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1720
1721   priv = stream->priv;
1722
1723   g_mutex_lock (&priv->lock);
1724   if ((encoder = priv->srtpenc))
1725     g_object_ref (encoder);
1726   g_mutex_unlock (&priv->lock);
1727
1728   return encoder;
1729 }
1730
1731 /**
1732  * gst_rtsp_stream_get_ssrc:
1733  * @stream: a #GstRTSPStream
1734  * @ssrc: (out): result ssrc
1735  *
1736  * Get the SSRC used by the RTP session of this stream. This function can only
1737  * be called when @stream has been joined.
1738  */
1739 void
1740 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1741 {
1742   GstRTSPStreamPrivate *priv;
1743
1744   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1745   priv = stream->priv;
1746   g_return_if_fail (priv->joined_bin != NULL);
1747
1748   g_mutex_lock (&priv->lock);
1749   if (ssrc && priv->session)
1750     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1751   g_mutex_unlock (&priv->lock);
1752 }
1753
1754 /**
1755  * gst_rtsp_stream_set_retransmission_time:
1756  * @stream: a #GstRTSPStream
1757  * @time: a #GstClockTime
1758  *
1759  * Set the amount of time to store retransmission packets.
1760  */
1761 void
1762 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1763     GstClockTime time)
1764 {
1765   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1766
1767   g_mutex_lock (&stream->priv->lock);
1768   stream->priv->rtx_time = time;
1769   if (stream->priv->rtxsend)
1770     g_object_set (stream->priv->rtxsend, "max-size-time",
1771         GST_TIME_AS_MSECONDS (time), NULL);
1772   g_mutex_unlock (&stream->priv->lock);
1773 }
1774
1775 /**
1776  * gst_rtsp_stream_get_retransmission_time:
1777  * @stream: a #GstRTSPStream
1778  *
1779  * Get the amount of time to store retransmission data.
1780  *
1781  * Returns: the amount of time to store retransmission data.
1782  */
1783 GstClockTime
1784 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1785 {
1786   GstClockTime ret;
1787
1788   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1789
1790   g_mutex_lock (&stream->priv->lock);
1791   ret = stream->priv->rtx_time;
1792   g_mutex_unlock (&stream->priv->lock);
1793
1794   return ret;
1795 }
1796
1797 /**
1798  * gst_rtsp_stream_set_retransmission_pt:
1799  * @stream: a #GstRTSPStream
1800  * @rtx_pt: a #guint
1801  *
1802  * Set the payload type (pt) for retransmission of this stream.
1803  */
1804 void
1805 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1806 {
1807   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1808
1809   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1810
1811   g_mutex_lock (&stream->priv->lock);
1812   stream->priv->rtx_pt = rtx_pt;
1813   if (stream->priv->rtxsend) {
1814     guint pt = gst_rtsp_stream_get_pt (stream);
1815     gchar *pt_s = g_strdup_printf ("%d", pt);
1816     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1817         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1818     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1819     g_free (pt_s);
1820     gst_structure_free (rtx_pt_map);
1821   }
1822   g_mutex_unlock (&stream->priv->lock);
1823 }
1824
1825 /**
1826  * gst_rtsp_stream_get_retransmission_pt:
1827  * @stream: a #GstRTSPStream
1828  *
1829  * Get the payload-type used for retransmission of this stream
1830  *
1831  * Returns: The retransmission PT.
1832  */
1833 guint
1834 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1835 {
1836   guint rtx_pt;
1837
1838   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1839
1840   g_mutex_lock (&stream->priv->lock);
1841   rtx_pt = stream->priv->rtx_pt;
1842   g_mutex_unlock (&stream->priv->lock);
1843
1844   return rtx_pt;
1845 }
1846
1847 /**
1848  * gst_rtsp_stream_set_buffer_size:
1849  * @stream: a #GstRTSPStream
1850  * @size: the buffer size
1851  *
1852  * Set the size of the UDP transmission buffer (in bytes)
1853  * Needs to be set before the stream is joined to a bin.
1854  *
1855  * Since: 1.6
1856  */
1857 void
1858 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1859 {
1860   g_mutex_lock (&stream->priv->lock);
1861   stream->priv->buffer_size = size;
1862   g_mutex_unlock (&stream->priv->lock);
1863 }
1864
1865 /**
1866  * gst_rtsp_stream_get_buffer_size:
1867  * @stream: a #GstRTSPStream
1868  *
1869  * Get the size of the UDP transmission buffer (in bytes)
1870  *
1871  * Returns: the size of the UDP TX buffer
1872  *
1873  * Since: 1.6
1874  */
1875 guint
1876 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1877 {
1878   guint buffer_size;
1879
1880   g_mutex_lock (&stream->priv->lock);
1881   buffer_size = stream->priv->buffer_size;
1882   g_mutex_unlock (&stream->priv->lock);
1883
1884   return buffer_size;
1885 }
1886
1887 /* executed from streaming thread */
1888 static void
1889 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1890 {
1891   GstRTSPStreamPrivate *priv = stream->priv;
1892   GstCaps *newcaps, *oldcaps;
1893
1894   newcaps = gst_pad_get_current_caps (pad);
1895
1896   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1897       newcaps);
1898
1899   g_mutex_lock (&priv->lock);
1900   oldcaps = priv->caps;
1901   priv->caps = newcaps;
1902   g_mutex_unlock (&priv->lock);
1903
1904   if (oldcaps)
1905     gst_caps_unref (oldcaps);
1906 }
1907
1908 static void
1909 dump_structure (const GstStructure * s)
1910 {
1911   gchar *sstr;
1912
1913   sstr = gst_structure_to_string (s);
1914   GST_INFO ("structure: %s", sstr);
1915   g_free (sstr);
1916 }
1917
1918 static GstRTSPStreamTransport *
1919 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1920 {
1921   GstRTSPStreamPrivate *priv = stream->priv;
1922   GList *walk;
1923   GstRTSPStreamTransport *result = NULL;
1924   const gchar *tmp;
1925   gchar *dest;
1926   guint port;
1927
1928   if (rtcp_from == NULL)
1929     return NULL;
1930
1931   tmp = g_strrstr (rtcp_from, ":");
1932   if (tmp == NULL)
1933     return NULL;
1934
1935   port = atoi (tmp + 1);
1936   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1937
1938   g_mutex_lock (&priv->lock);
1939   GST_INFO ("finding %s:%d in %d transports", dest, port,
1940       g_list_length (priv->transports));
1941
1942   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1943     GstRTSPStreamTransport *trans = walk->data;
1944     const GstRTSPTransport *tr;
1945     gint min, max;
1946
1947     tr = gst_rtsp_stream_transport_get_transport (trans);
1948
1949     if (priv->client_side) {
1950       /* In client side mode the 'destination' is the RTSP server, so send
1951        * to those ports */
1952       min = tr->server_port.min;
1953       max = tr->server_port.max;
1954     } else {
1955       min = tr->client_port.min;
1956       max = tr->client_port.max;
1957     }
1958
1959     if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
1960         (min == port || max == port)) {
1961       result = trans;
1962       break;
1963     }
1964   }
1965   if (result)
1966     g_object_ref (result);
1967   g_mutex_unlock (&priv->lock);
1968
1969   g_free (dest);
1970
1971   return result;
1972 }
1973
1974 static GstRTSPStreamTransport *
1975 check_transport (GObject * source, GstRTSPStream * stream)
1976 {
1977   GstStructure *stats;
1978   GstRTSPStreamTransport *trans;
1979
1980   /* see if we have a stream to match with the origin of the RTCP packet */
1981   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1982   if (trans == NULL) {
1983     g_object_get (source, "stats", &stats, NULL);
1984     if (stats) {
1985       const gchar *rtcp_from;
1986
1987       dump_structure (stats);
1988
1989       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1990       if ((trans = find_transport (stream, rtcp_from))) {
1991         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1992             source);
1993         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1994             g_object_unref);
1995       }
1996       gst_structure_free (stats);
1997     }
1998   }
1999   return trans;
2000 }
2001
2002
2003 static void
2004 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2005 {
2006   GstRTSPStreamTransport *trans;
2007
2008   GST_INFO ("%p: new source %p", stream, source);
2009
2010   trans = check_transport (source, stream);
2011
2012   if (trans)
2013     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
2014 }
2015
2016 static void
2017 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
2018 {
2019   GST_INFO ("%p: new SDES %p", stream, source);
2020 }
2021
2022 static void
2023 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
2024 {
2025   GstRTSPStreamTransport *trans;
2026
2027   trans = check_transport (source, stream);
2028
2029   if (trans) {
2030     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
2031     gst_rtsp_stream_transport_keep_alive (trans);
2032   }
2033 #ifdef DUMP_STATS
2034   {
2035     GstStructure *stats;
2036     g_object_get (source, "stats", &stats, NULL);
2037     if (stats) {
2038       dump_structure (stats);
2039       gst_structure_free (stats);
2040     }
2041   }
2042 #endif
2043 }
2044
2045 static void
2046 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2047 {
2048   GST_INFO ("%p: source %p bye", stream, source);
2049 }
2050
2051 static void
2052 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2053 {
2054   GstRTSPStreamTransport *trans;
2055
2056   GST_INFO ("%p: source %p bye timeout", stream, source);
2057
2058   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2059     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2060     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2061   }
2062 }
2063
2064 static void
2065 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2066 {
2067   GstRTSPStreamTransport *trans;
2068
2069   GST_INFO ("%p: source %p timeout", stream, source);
2070
2071   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2072     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2073     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2074   }
2075 }
2076
2077 static void
2078 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2079 {
2080   GST_INFO ("%p: new sender source %p", stream, source);
2081 #ifndef DUMP_STATS
2082   {
2083     GstStructure *stats;
2084     g_object_get (source, "stats", &stats, NULL);
2085     if (stats) {
2086       dump_structure (stats);
2087       gst_structure_free (stats);
2088     }
2089   }
2090 #endif
2091 }
2092
2093 static void
2094 on_sender_ssrc_active (GObject * session, GObject * source,
2095     GstRTSPStream * stream)
2096 {
2097 #ifndef DUMP_STATS
2098   {
2099     GstStructure *stats;
2100     g_object_get (source, "stats", &stats, NULL);
2101     if (stats) {
2102       dump_structure (stats);
2103       gst_structure_free (stats);
2104     }
2105   }
2106 #endif
2107 }
2108
2109 static void
2110 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2111 {
2112   if (is_rtp) {
2113     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2114     g_list_free (priv->tr_cache_rtp);
2115     priv->tr_cache_rtp = NULL;
2116   } else {
2117     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2118     g_list_free (priv->tr_cache_rtcp);
2119     priv->tr_cache_rtcp = NULL;
2120   }
2121 }
2122
2123 /* Must be called with priv->lock */
2124 static void
2125 send_tcp_message (GstRTSPStream * stream, gint idx)
2126 {
2127   GstRTSPStreamPrivate *priv = stream->priv;
2128   GstAppSink *sink;
2129   GList *walk;
2130   GstSample *sample;
2131   GstBuffer *buffer;
2132   gboolean is_rtp;
2133
2134   if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
2135     return;
2136   }
2137
2138   priv->have_buffer[idx] = FALSE;
2139
2140   if (priv->appsink[idx] == NULL) {
2141     /* session expired */
2142     return;
2143   }
2144
2145   sink = GST_APP_SINK (priv->appsink[idx]);
2146   sample = gst_app_sink_pull_sample (sink);
2147   if (!sample) {
2148     return;
2149   }
2150
2151   buffer = gst_sample_get_buffer (sample);
2152
2153   is_rtp = (idx == 0);
2154
2155   if (is_rtp) {
2156     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2157       clear_tr_cache (priv, is_rtp);
2158       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2159         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2160         const GstRTSPTransport *t =
2161             gst_rtsp_stream_transport_get_transport (tr);
2162
2163         if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
2164           continue;
2165
2166         priv->tr_cache_rtp =
2167             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2168       }
2169       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2170     }
2171   } else {
2172     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2173       clear_tr_cache (priv, is_rtp);
2174       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2175         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2176         const GstRTSPTransport *t =
2177             gst_rtsp_stream_transport_get_transport (tr);
2178
2179         if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
2180           continue;
2181
2182         priv->tr_cache_rtcp =
2183             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2184       }
2185       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2186     }
2187   }
2188
2189   priv->n_outstanding += priv->n_tcp_transports;
2190
2191   g_mutex_unlock (&priv->lock);
2192
2193   if (is_rtp) {
2194     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2195       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2196       if (!gst_rtsp_stream_transport_send_rtp (tr, buffer)) {
2197         /* remove transport on send error */
2198         g_mutex_lock (&priv->lock);
2199         priv->n_outstanding--;
2200         update_transport (stream, tr, FALSE);
2201         g_mutex_unlock (&priv->lock);
2202       }
2203     }
2204   } else {
2205     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2206       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2207       if (!gst_rtsp_stream_transport_send_rtcp (tr, buffer)) {
2208         /* remove transport on send error */
2209         g_mutex_lock (&priv->lock);
2210         priv->n_outstanding--;
2211         update_transport (stream, tr, FALSE);
2212         g_mutex_unlock (&priv->lock);
2213       }
2214     }
2215   }
2216   gst_sample_unref (sample);
2217
2218   g_mutex_lock (&priv->lock);
2219 }
2220
2221 static GstFlowReturn
2222 handle_new_sample (GstAppSink * sink, gpointer user_data)
2223 {
2224   GstRTSPStream *stream = user_data;
2225   GstRTSPStreamPrivate *priv = stream->priv;
2226   int i;
2227   int idx = -1;
2228
2229   g_mutex_lock (&priv->lock);
2230
2231   for (i = 0; i < 2; i++)
2232     if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
2233       priv->have_buffer[i] = TRUE;
2234       if (priv->n_outstanding == 0) {
2235         /* send message */
2236         idx = i;
2237       }
2238       break;
2239     }
2240
2241   if (idx != -1)
2242     send_tcp_message (stream, idx);
2243
2244   g_mutex_unlock (&priv->lock);
2245
2246   return GST_FLOW_OK;
2247 }
2248
2249 static GstAppSinkCallbacks sink_cb = {
2250   NULL,                         /* not interested in EOS */
2251   NULL,                         /* not interested in preroll samples */
2252   handle_new_sample,
2253 };
2254
2255 static GstElement *
2256 get_rtp_encoder (GstRTSPStream * stream, guint session)
2257 {
2258   GstRTSPStreamPrivate *priv = stream->priv;
2259
2260   if (priv->srtpenc == NULL) {
2261     gchar *name;
2262
2263     name = g_strdup_printf ("srtpenc_%u", session);
2264     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2265     g_free (name);
2266
2267     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2268   }
2269   return gst_object_ref (priv->srtpenc);
2270 }
2271
2272 static GstElement *
2273 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2274 {
2275   GstRTSPStreamPrivate *priv = stream->priv;
2276   GstElement *oldenc, *enc;
2277   GstPad *pad;
2278   gchar *name;
2279
2280   if (priv->idx != session)
2281     return NULL;
2282
2283   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2284
2285   oldenc = priv->srtpenc;
2286   enc = get_rtp_encoder (stream, session);
2287   name = g_strdup_printf ("rtp_sink_%d", session);
2288   pad = gst_element_get_request_pad (enc, name);
2289   g_free (name);
2290   gst_object_unref (pad);
2291
2292   if (oldenc == NULL)
2293     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2294         enc);
2295
2296   return enc;
2297 }
2298
2299 static GstElement *
2300 request_rtcp_encoder (GstElement * rtpbin, guint session,
2301     GstRTSPStream * stream)
2302 {
2303   GstRTSPStreamPrivate *priv = stream->priv;
2304   GstElement *oldenc, *enc;
2305   GstPad *pad;
2306   gchar *name;
2307
2308   if (priv->idx != session)
2309     return NULL;
2310
2311   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2312
2313   oldenc = priv->srtpenc;
2314   enc = get_rtp_encoder (stream, session);
2315   name = g_strdup_printf ("rtcp_sink_%d", session);
2316   pad = gst_element_get_request_pad (enc, name);
2317   g_free (name);
2318   gst_object_unref (pad);
2319
2320   if (oldenc == NULL)
2321     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2322         enc);
2323
2324   return enc;
2325 }
2326
2327 static GstCaps *
2328 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2329 {
2330   GstRTSPStreamPrivate *priv = stream->priv;
2331   GstCaps *caps;
2332
2333   GST_DEBUG ("request key %08x", ssrc);
2334
2335   g_mutex_lock (&priv->lock);
2336   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2337     gst_caps_ref (caps);
2338   g_mutex_unlock (&priv->lock);
2339
2340   return caps;
2341 }
2342
2343 static GstElement *
2344 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2345     GstRTSPStream * stream)
2346 {
2347   GstRTSPStreamPrivate *priv = stream->priv;
2348
2349   if (priv->idx != session)
2350     return NULL;
2351
2352   if (priv->srtpdec == NULL) {
2353     gchar *name;
2354
2355     name = g_strdup_printf ("srtpdec_%u", session);
2356     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2357     g_free (name);
2358
2359     g_signal_connect (priv->srtpdec, "request-key",
2360         (GCallback) request_key, stream);
2361
2362     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_RTCP_DECODER],
2363         0, priv->srtpdec);
2364
2365   }
2366   return gst_object_ref (priv->srtpdec);
2367 }
2368
2369 /**
2370  * gst_rtsp_stream_request_aux_sender:
2371  * @stream: a #GstRTSPStream
2372  * @sessid: the session id
2373  *
2374  * Creating a rtxsend bin
2375  *
2376  * Returns: (transfer full) (nullable): a #GstElement.
2377  *
2378  * Since: 1.6
2379  */
2380 GstElement *
2381 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2382 {
2383   GstElement *bin;
2384   GstPad *pad;
2385   GstStructure *pt_map;
2386   gchar *name;
2387   guint pt, rtx_pt;
2388   gchar *pt_s;
2389
2390   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2391
2392   pt = gst_rtsp_stream_get_pt (stream);
2393   pt_s = g_strdup_printf ("%u", pt);
2394   rtx_pt = stream->priv->rtx_pt;
2395
2396   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2397
2398   bin = gst_bin_new (NULL);
2399   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2400   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2401       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2402   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2403       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2404   g_free (pt_s);
2405   gst_structure_free (pt_map);
2406   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2407
2408   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2409   name = g_strdup_printf ("src_%u", sessid);
2410   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2411   g_free (name);
2412   gst_object_unref (pad);
2413
2414   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2415   name = g_strdup_printf ("sink_%u", sessid);
2416   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2417   g_free (name);
2418   gst_object_unref (pad);
2419
2420   return bin;
2421 }
2422
2423 static void
2424 add_rtx_pt (gpointer key, GstCaps * caps, GstStructure * pt_map)
2425 {
2426   guint pt = GPOINTER_TO_INT (key);
2427   const GstStructure *s = gst_caps_get_structure (caps, 0);
2428   const gchar *apt;
2429
2430   if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "RTX") &&
2431       (apt = gst_structure_get_string (s, "apt"))) {
2432     gst_structure_set (pt_map, apt, G_TYPE_UINT, pt, NULL);
2433   }
2434 }
2435
2436 /* Call with priv->lock taken */
2437 static void
2438 update_rtx_receive_pt_map (GstRTSPStream * stream)
2439 {
2440   GstStructure *pt_map;
2441
2442   if (!stream->priv->rtxreceive)
2443     goto done;
2444
2445   pt_map = gst_structure_new_empty ("application/x-rtp-pt-map");
2446   g_hash_table_foreach (stream->priv->ptmap, (GHFunc) add_rtx_pt, pt_map);
2447   g_object_set (stream->priv->rtxreceive, "payload-type-map", pt_map, NULL);
2448   gst_structure_free (pt_map);
2449
2450 done:
2451   return;
2452 }
2453
2454 static void
2455 retrieve_ulpfec_pt (gpointer key, GstCaps * caps, GstElement * ulpfec_decoder)
2456 {
2457   guint pt = GPOINTER_TO_INT (key);
2458   const GstStructure *s = gst_caps_get_structure (caps, 0);
2459
2460   if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
2461     g_object_set (ulpfec_decoder, "pt", pt, NULL);
2462 }
2463
2464 static void
2465 update_ulpfec_decoder_pt (GstRTSPStream * stream)
2466 {
2467   if (!stream->priv->ulpfec_decoder)
2468     goto done;
2469
2470   g_hash_table_foreach (stream->priv->ptmap, (GHFunc) retrieve_ulpfec_pt,
2471       stream->priv->ulpfec_decoder);
2472
2473 done:
2474   return;
2475 }
2476
2477 /**
2478  * gst_rtsp_stream_request_aux_receiver:
2479  * @stream: a #GstRTSPStream
2480  * @sessid: the session id
2481  *
2482  * Creating a rtxreceive bin
2483  *
2484  * Returns: (transfer full) (nullable): a #GstElement.
2485  *
2486  * Since: 1.16
2487  */
2488 GstElement *
2489 gst_rtsp_stream_request_aux_receiver (GstRTSPStream * stream, guint sessid)
2490 {
2491   GstElement *bin;
2492   GstPad *pad;
2493   gchar *name;
2494
2495   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2496
2497   bin = gst_bin_new (NULL);
2498   stream->priv->rtxreceive = gst_element_factory_make ("rtprtxreceive", NULL);
2499   update_rtx_receive_pt_map (stream);
2500   update_ulpfec_decoder_pt (stream);
2501   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxreceive));
2502
2503   pad = gst_element_get_static_pad (stream->priv->rtxreceive, "src");
2504   name = g_strdup_printf ("src_%u", sessid);
2505   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2506   g_free (name);
2507   gst_object_unref (pad);
2508
2509   pad = gst_element_get_static_pad (stream->priv->rtxreceive, "sink");
2510   name = g_strdup_printf ("sink_%u", sessid);
2511   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2512   g_free (name);
2513   gst_object_unref (pad);
2514
2515   return bin;
2516 }
2517
2518 /**
2519  * gst_rtsp_stream_set_pt_map:
2520  * @stream: a #GstRTSPStream
2521  * @pt: the pt
2522  * @caps: a #GstCaps
2523  *
2524  * Configure a pt map between @pt and @caps.
2525  */
2526 void
2527 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2528 {
2529   GstRTSPStreamPrivate *priv = stream->priv;
2530
2531   if (!GST_IS_CAPS (caps))
2532     return;
2533
2534   g_mutex_lock (&priv->lock);
2535   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2536   update_rtx_receive_pt_map (stream);
2537   g_mutex_unlock (&priv->lock);
2538 }
2539
2540 /**
2541  * gst_rtsp_stream_set_publish_clock_mode:
2542  * @stream: a #GstRTSPStream
2543  * @mode: the clock publish mode
2544  *
2545  * Sets if and how the stream clock should be published according to RFC7273.
2546  *
2547  * Since: 1.8
2548  */
2549 void
2550 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2551     GstRTSPPublishClockMode mode)
2552 {
2553   GstRTSPStreamPrivate *priv;
2554
2555   priv = stream->priv;
2556   g_mutex_lock (&priv->lock);
2557   priv->publish_clock_mode = mode;
2558   g_mutex_unlock (&priv->lock);
2559 }
2560
2561 /**
2562  * gst_rtsp_stream_get_publish_clock_mode:
2563  * @stream: a #GstRTSPStream
2564  *
2565  * Gets if and how the stream clock should be published according to RFC7273.
2566  *
2567  * Returns: The GstRTSPPublishClockMode
2568  *
2569  * Since: 1.8
2570  */
2571 GstRTSPPublishClockMode
2572 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2573 {
2574   GstRTSPStreamPrivate *priv;
2575   GstRTSPPublishClockMode ret;
2576
2577   priv = stream->priv;
2578   g_mutex_lock (&priv->lock);
2579   ret = priv->publish_clock_mode;
2580   g_mutex_unlock (&priv->lock);
2581
2582   return ret;
2583 }
2584
2585 static GstCaps *
2586 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2587     GstRTSPStream * stream)
2588 {
2589   GstRTSPStreamPrivate *priv = stream->priv;
2590   GstCaps *caps = NULL;
2591
2592   g_mutex_lock (&priv->lock);
2593
2594   if (priv->idx == session) {
2595     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2596     if (caps) {
2597       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2598       gst_caps_ref (caps);
2599     } else {
2600       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2601     }
2602   }
2603
2604   g_mutex_unlock (&priv->lock);
2605
2606   return caps;
2607 }
2608
2609 static void
2610 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2611 {
2612   GstRTSPStreamPrivate *priv = stream->priv;
2613   gchar *name;
2614   GstPadLinkReturn ret;
2615   guint sessid;
2616
2617   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2618       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2619
2620   name = gst_pad_get_name (pad);
2621   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2622     g_free (name);
2623     return;
2624   }
2625   g_free (name);
2626
2627   if (priv->idx != sessid)
2628     return;
2629
2630   if (gst_pad_is_linked (priv->sinkpad)) {
2631     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2632         GST_DEBUG_PAD_NAME (priv->sinkpad));
2633     return;
2634   }
2635
2636   /* link the RTP pad to the session manager, it should not really fail unless
2637    * this is not really an RTP pad */
2638   ret = gst_pad_link (pad, priv->sinkpad);
2639   if (ret != GST_PAD_LINK_OK)
2640     goto link_failed;
2641   priv->recv_rtp_src = gst_object_ref (pad);
2642
2643   return;
2644
2645 /* ERRORS */
2646 link_failed:
2647   {
2648     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2649         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2650   }
2651 }
2652
2653 static void
2654 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2655     GstRTSPStream * stream)
2656 {
2657   /* TODO: What to do here other than this? */
2658   GST_DEBUG ("Stream %p: Got EOS", stream);
2659   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2660 }
2661
2662 typedef struct _ProbeData ProbeData;
2663
2664 struct _ProbeData
2665 {
2666   GstRTSPStream *stream;
2667   /* existing sink, already linked to tee */
2668   GstElement *sink1;
2669   /* new sink, about to be linked */
2670   GstElement *sink2;
2671   /* new queue element, that will be linked to tee and sink1 */
2672   GstElement **queue1;
2673   /* new queue element, that will be linked to tee and sink2 */
2674   GstElement **queue2;
2675   GstPad *sink_pad;
2676   GstPad *tee_pad;
2677   guint index;
2678 };
2679
2680 static void
2681 free_cb_data (gpointer user_data)
2682 {
2683   ProbeData *data = user_data;
2684
2685   gst_object_unref (data->stream);
2686   gst_object_unref (data->sink1);
2687   gst_object_unref (data->sink2);
2688   gst_object_unref (data->sink_pad);
2689   gst_object_unref (data->tee_pad);
2690   g_free (data);
2691 }
2692
2693
2694 static void
2695 create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream,
2696     GstElement * tee, GstElement * sink, GstElement ** queue)
2697 {
2698   GstRTSPStreamPrivate *priv = stream->priv;
2699   GstPad *tee_pad;
2700   GstPad *queue_pad;
2701   GstPad *sink_pad;
2702
2703   /* create queue for the new stream */
2704   *queue = gst_element_factory_make ("queue", NULL);
2705   g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0,
2706       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2707   gst_bin_add (priv->joined_bin, *queue);
2708
2709   /* link tee to queue */
2710   tee_pad = gst_element_get_request_pad (tee, "src_%u");
2711   queue_pad = gst_element_get_static_pad (*queue, "sink");
2712   gst_pad_link (tee_pad, queue_pad);
2713   gst_object_unref (queue_pad);
2714   gst_object_unref (tee_pad);
2715
2716   /* link queue to sink */
2717   queue_pad = gst_element_get_static_pad (*queue, "src");
2718   sink_pad = gst_element_get_static_pad (sink, "sink");
2719   gst_pad_link (queue_pad, sink_pad);
2720   gst_object_unref (queue_pad);
2721   gst_object_unref (sink_pad);
2722
2723   gst_element_sync_state_with_parent (sink);
2724   gst_element_sync_state_with_parent (*queue);
2725 }
2726
2727 static GstPadProbeReturn
2728 create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad,
2729     GstPadProbeInfo * info, gpointer user_data)
2730 {
2731   GstRTSPStreamPrivate *priv;
2732   ProbeData *data = user_data;
2733   GstRTSPStream *stream;
2734   GstElement **queue1;
2735   GstElement **queue2;
2736   GstPad *sink_pad;
2737   GstPad *tee_pad;
2738   GstPad *queue_pad;
2739   guint index;
2740
2741   stream = data->stream;
2742   priv = stream->priv;
2743   queue1 = data->queue1;
2744   queue2 = data->queue2;
2745   sink_pad = data->sink_pad;
2746   tee_pad = data->tee_pad;
2747   index = data->index;
2748
2749   /* unlink tee and the existing sink:
2750    *   .-----.    .---------.
2751    *   | tee |    |  sink1  |
2752    * sink   src->sink       |
2753    *   '-----'    '---------'
2754    */
2755   g_assert (gst_pad_unlink (tee_pad, sink_pad));
2756
2757   /* add queue to the already existing stream */
2758   *queue1 = gst_element_factory_make ("queue", NULL);
2759   g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0,
2760       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2761   gst_bin_add (priv->joined_bin, *queue1);
2762
2763   /* link tee, queue and sink:
2764    *   .-----.    .---------.    .---------.
2765    *   | tee |    |  queue1 |    | sink1   |
2766    * sink   src->sink      src->sink       |
2767    *   '-----'    '---------'    '---------'
2768    */
2769   queue_pad = gst_element_get_static_pad (*queue1, "sink");
2770   gst_pad_link (tee_pad, queue_pad);
2771   gst_object_unref (queue_pad);
2772   queue_pad = gst_element_get_static_pad (*queue1, "src");
2773   gst_pad_link (queue_pad, sink_pad);
2774   gst_object_unref (queue_pad);
2775
2776   gst_element_sync_state_with_parent (*queue1);
2777
2778   /* create queue and link it to tee and the new sink */
2779   create_and_plug_queue_to_unlinked_stream (stream,
2780       priv->tee[index], data->sink2, queue2);
2781
2782   /* the final stream:
2783    *
2784    *    .-----.    .---------.    .---------.
2785    *    | tee |    |  queue1 |    | sink1   |
2786    *  sink   src->sink      src->sink       |
2787    *    |     |    '---------'    '---------'
2788    *    |     |    .---------.    .---------.
2789    *    |     |    |  queue2 |    | sink2   |
2790    *    |    src->sink      src->sink       |
2791    *    '-----'    '---------'    '---------'
2792    */
2793
2794   return GST_PAD_PROBE_REMOVE;
2795 }
2796
2797 static void
2798 create_and_plug_queue_to_linked_stream (GstRTSPStream * stream,
2799     GstElement * sink1, GstElement * sink2, guint index, GstElement ** queue1,
2800     GstElement ** queue2)
2801 {
2802   ProbeData *data;
2803
2804   data = g_new0 (ProbeData, 1);
2805   data->stream = gst_object_ref (stream);
2806   data->sink1 = gst_object_ref (sink1);
2807   data->sink2 = gst_object_ref (sink2);
2808   data->queue1 = queue1;
2809   data->queue2 = queue2;
2810   data->index = index;
2811
2812   data->sink_pad = gst_element_get_static_pad (sink1, "sink");
2813   g_assert (data->sink_pad);
2814   data->tee_pad = gst_pad_get_peer (data->sink_pad);
2815   g_assert (data->tee_pad);
2816
2817   gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE,
2818       create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data);
2819 }
2820
2821 static void
2822 plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug,
2823     GstElement ** queue_to_plug, guint index, gboolean is_mcast)
2824 {
2825   GstRTSPStreamPrivate *priv = stream->priv;
2826   GstElement *existing_sink;
2827
2828   if (is_mcast)
2829     existing_sink = priv->udpsink[index];
2830   else
2831     existing_sink = priv->mcast_udpsink[index];
2832
2833   GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp");
2834
2835   /* add sink to the bin */
2836   gst_bin_add (priv->joined_bin, sink_to_plug);
2837
2838   if (priv->appsink[index] && existing_sink) {
2839
2840     /* queues are already added for the existing stream, add one for
2841        the newly added udp stream */
2842     create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index],
2843         sink_to_plug, queue_to_plug);
2844
2845   } else if (priv->appsink[index] || existing_sink) {
2846     GstElement **queue;
2847     GstElement *element;
2848
2849     /* add queue to the already existing stream plus the newly created udp
2850        stream */
2851     if (priv->appsink[index]) {
2852       element = priv->appsink[index];
2853       queue = &priv->appqueue[index];
2854     } else {
2855       element = existing_sink;
2856       if (is_mcast)
2857         queue = &priv->udpqueue[index];
2858       else
2859         queue = &priv->mcast_udpqueue[index];
2860     }
2861
2862     create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug,
2863         index, queue, queue_to_plug);
2864
2865   } else {
2866     GstPad *tee_pad;
2867     GstPad *sink_pad;
2868
2869     GST_DEBUG_OBJECT (stream, "creating first stream");
2870
2871     /* no need to add queues */
2872     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2873     sink_pad = gst_element_get_static_pad (sink_to_plug, "sink");
2874     gst_pad_link (tee_pad, sink_pad);
2875     gst_object_unref (tee_pad);
2876     gst_object_unref (sink_pad);
2877   }
2878
2879   gst_element_sync_state_with_parent (sink_to_plug);
2880 }
2881
2882 static void
2883 plug_tcp_sink (GstRTSPStream * stream, guint index)
2884 {
2885   GstRTSPStreamPrivate *priv = stream->priv;
2886
2887   GST_DEBUG_OBJECT (stream, "plug tcp sink");
2888
2889   /* add sink to the bin */
2890   gst_bin_add (priv->joined_bin, priv->appsink[index]);
2891
2892   if (priv->mcast_udpsink[index] && priv->udpsink[index]) {
2893
2894     /* queues are already added for the existing stream, add one for
2895        the newly added tcp stream */
2896     create_and_plug_queue_to_unlinked_stream (stream,
2897         priv->tee[index], priv->appsink[index], &priv->appqueue[index]);
2898
2899   } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) {
2900     GstElement **queue;
2901     GstElement *element;
2902
2903     /* add queue to the already existing stream plus the newly created tcp
2904        stream */
2905     if (priv->mcast_udpsink[index]) {
2906       element = priv->mcast_udpsink[index];
2907       queue = &priv->mcast_udpqueue[index];
2908     } else {
2909       element = priv->udpsink[index];
2910       queue = &priv->udpqueue[index];
2911     }
2912
2913     create_and_plug_queue_to_linked_stream (stream, element,
2914         priv->appsink[index], index, queue, &priv->appqueue[index]);
2915
2916   } else {
2917     GstPad *tee_pad;
2918     GstPad *sink_pad;
2919
2920     /* no need to add queues */
2921     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2922     sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink");
2923     gst_pad_link (tee_pad, sink_pad);
2924     gst_object_unref (tee_pad);
2925     gst_object_unref (sink_pad);
2926   }
2927
2928   gst_element_sync_state_with_parent (priv->appsink[index]);
2929 }
2930
2931 static void
2932 plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport,
2933     guint index)
2934 {
2935   GstRTSPStreamPrivate *priv;
2936   gboolean is_tcp, is_udp, is_mcast;
2937   priv = stream->priv;
2938
2939   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2940   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2941   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2942
2943   if (is_udp)
2944     plug_udp_sink (stream, priv->udpsink[index],
2945         &priv->udpqueue[index], index, FALSE);
2946
2947   else if (is_mcast)
2948     plug_udp_sink (stream, priv->mcast_udpsink[index],
2949         &priv->mcast_udpqueue[index], index, TRUE);
2950
2951   else if (is_tcp)
2952     plug_tcp_sink (stream, index);
2953 }
2954
2955 /* must be called with lock */
2956 static gboolean
2957 create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
2958 {
2959   GstRTSPStreamPrivate *priv;
2960   GstPad *pad;
2961   GstBin *bin;
2962   gboolean is_tcp, is_udp, is_mcast;
2963   gint mcast_ttl = 0;
2964   gint i;
2965
2966   GST_DEBUG_OBJECT (stream, "create sender part");
2967   priv = stream->priv;
2968   bin = priv->joined_bin;
2969
2970   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2971   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2972   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2973
2974   if (is_mcast)
2975     mcast_ttl = transport->ttl;
2976
2977   GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d (ttl: %d)", is_tcp,
2978       is_udp, is_mcast, mcast_ttl);
2979
2980   if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) {
2981     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP");
2982     return FALSE;
2983   }
2984
2985   if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) {
2986     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast");
2987     return FALSE;
2988   }
2989
2990   for (i = 0; i < 2; i++) {
2991     gboolean link_tee = FALSE;
2992     /* For the sender we create this bit of pipeline for both
2993      * RTP and RTCP.
2994      * Initially there will be only one active transport for
2995      * the stream, so the pipeline will look like this:
2996      *
2997      * .--------.      .-----.    .---------.
2998      * | rtpbin |      | tee |    |  sink   |
2999      * |       send->sink   src->sink       |
3000      * '--------'      '-----'    '---------'
3001      *
3002      * For each new transport, the already existing branch will
3003      * be reconfigured by adding a queue element:
3004      *
3005      * .--------.      .-----.    .---------.    .---------.
3006      * | rtpbin |      | tee |    |  queue  |    | udpsink |
3007      * |       send->sink   src->sink      src->sink       |
3008      * '--------'      |     |    '---------'    '---------'
3009      *                 |     |    .---------.    .---------.
3010      *                 |     |    |  queue  |    | udpsink |
3011      *                 |    src->sink      src->sink       |
3012      *                 |     |    '---------'    '---------'
3013      *                 |     |    .---------.    .---------.
3014      *                 |     |    |  queue  |    | appsink |
3015      *                 |    src->sink      src->sink       |
3016      *                 '-----'    '---------'    '---------'
3017      */
3018
3019     /* Only link the RTP send src if we're going to send RTP, link
3020      * the RTCP send src always */
3021     if (!priv->srcpad && i == 0)
3022       continue;
3023
3024     if (!priv->tee[i]) {
3025       /* make tee for RTP/RTCP */
3026       priv->tee[i] = gst_element_factory_make ("tee", NULL);
3027       gst_bin_add (bin, priv->tee[i]);
3028       link_tee = TRUE;
3029     }
3030
3031     if (is_udp && !priv->udpsink[i]) {
3032       /* we create only one pair of udpsinks for IPv4 and IPv6 */
3033       create_and_configure_udpsink (stream, &priv->udpsink[i],
3034           priv->socket_v4[i], priv->socket_v6[i], FALSE, (i == 0), mcast_ttl);
3035       plug_sink (stream, transport, i);
3036     } else if (is_mcast && !priv->mcast_udpsink[i]) {
3037       /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */
3038       create_and_configure_udpsink (stream, &priv->mcast_udpsink[i],
3039           priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0),
3040           mcast_ttl);
3041       plug_sink (stream, transport, i);
3042     } else if (is_tcp && !priv->appsink[i]) {
3043       /* make appsink */
3044       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
3045       g_object_set (priv->appsink[i], "emit-signals", FALSE, "max-buffers", 1,
3046           NULL);
3047
3048       /* we need to set sync and preroll to FALSE for the sink to avoid
3049        * deadlock. This is only needed for sink sending RTCP data. */
3050       if (i == 1)
3051         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
3052
3053       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
3054           &sink_cb, stream, NULL);
3055       plug_sink (stream, transport, i);
3056     }
3057
3058     if (link_tee) {
3059       /* and link to rtpbin send pad */
3060       gst_element_sync_state_with_parent (priv->tee[i]);
3061       pad = gst_element_get_static_pad (priv->tee[i], "sink");
3062       gst_pad_link (priv->send_src[i], pad);
3063       gst_object_unref (pad);
3064     }
3065   }
3066
3067   return TRUE;
3068 }
3069
3070 /* must be called with lock */
3071 static void
3072 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
3073     GstElement * funnel)
3074 {
3075   GstRTSPStreamPrivate *priv;
3076   GstPad *pad, *selpad;
3077   gulong id = 0;
3078
3079   priv = stream->priv;
3080
3081   pad = gst_element_get_static_pad (src, "src");
3082   if (priv->srcpad) {
3083     /* block pad so src can't push data while it's not yet linked */
3084     id = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK |
3085         GST_PAD_PROBE_TYPE_BUFFER, NULL, NULL, NULL);
3086     /* we set and keep these to playing so that they don't cause NO_PREROLL return
3087      * values. This is only relevant for PLAY pipelines */
3088     gst_element_set_state (src, GST_STATE_PLAYING);
3089     gst_element_set_locked_state (src, TRUE);
3090   }
3091
3092   /* add src */
3093   gst_bin_add (bin, src);
3094
3095   /* and link to the funnel */
3096   selpad = gst_element_get_request_pad (funnel, "sink_%u");
3097   gst_pad_link (pad, selpad);
3098   if (id != 0)
3099     gst_pad_remove_probe (pad, id);
3100   gst_object_unref (pad);
3101   gst_object_unref (selpad);
3102 }
3103
3104 /* must be called with lock */
3105 static gboolean
3106 create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport *
3107     transport)
3108 {
3109   GstRTSPStreamPrivate *priv;
3110   GstPad *pad;
3111   GstBin *bin;
3112   gboolean tcp;
3113   gboolean udp;
3114   gboolean mcast;
3115   gint i;
3116
3117   GST_DEBUG_OBJECT (stream, "create receiver part");
3118   priv = stream->priv;
3119   bin = priv->joined_bin;
3120
3121   tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3122   udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3123   mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3124
3125   for (i = 0; i < 2; i++) {
3126     /* For the receiver we create this bit of pipeline for both
3127      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
3128      * and it is all funneled into the rtpbin receive pad.
3129      *
3130      *
3131      * .--------.     .--------.    .--------.
3132      * | udpsrc |     | funnel |    | rtpbin |
3133      * | RTP    src->sink      src->sink     |
3134      * '--------'     |        |    |        |
3135      * .--------.     |        |    |        |
3136      * | appsrc |     |        |    |        |
3137      * | RTP    src->sink      |    |        |
3138      * '--------'     '--------'    |        |
3139      *                              |        |
3140      * .--------.     .--------.    |        |
3141      * | udpsrc |     | funnel |    |        |
3142      * | RTCP   src->sink      src->sink     |
3143      * '--------'     |        |    '--------'
3144      * .--------.     |        |
3145      * | appsrc |     |        |
3146      * | RTCP   src->sink      |
3147      * '--------'     '--------'
3148      */
3149
3150     if (!priv->sinkpad && i == 0) {
3151       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
3152        * RTCP sink always */
3153       continue;
3154     }
3155
3156     /* make funnel for the RTP/RTCP receivers */
3157     if (!priv->funnel[i]) {
3158       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
3159       gst_bin_add (bin, priv->funnel[i]);
3160
3161       pad = gst_element_get_static_pad (priv->funnel[i], "src");
3162       gst_pad_link (pad, priv->recv_sink[i]);
3163       gst_object_unref (pad);
3164     }
3165
3166     if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) {
3167       GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources");
3168       if (!create_and_configure_udpsource (&priv->udpsrc_v4[i],
3169               priv->socket_v4[i]))
3170         goto udpsrc_error;
3171
3172       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
3173     }
3174
3175     if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) {
3176       GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources");
3177       if (!create_and_configure_udpsource (&priv->udpsrc_v6[i],
3178               priv->socket_v6[i]))
3179         goto udpsrc_error;
3180
3181       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
3182     }
3183
3184     if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) {
3185       GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources");
3186       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i],
3187               priv->mcast_socket_v4[i]))
3188         goto mcast_udpsrc_error;
3189       plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
3190     }
3191
3192     if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) {
3193       GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources");
3194       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i],
3195               priv->mcast_socket_v6[i]))
3196         goto mcast_udpsrc_error;
3197       plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
3198     }
3199
3200     if (tcp && !priv->appsrc[i]) {
3201       /* make and add appsrc */
3202       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
3203       priv->appsrc_base_time[i] = -1;
3204       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
3205           TRUE, NULL);
3206       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
3207     }
3208
3209     gst_element_sync_state_with_parent (priv->funnel[i]);
3210   }
3211
3212   return TRUE;
3213
3214 mcast_udpsrc_error:
3215 udpsrc_error:
3216   return FALSE;
3217 }
3218
3219 static gboolean
3220 check_mcast_part_for_transport (GstRTSPStream * stream,
3221     const GstRTSPTransport * tr)
3222 {
3223   GstRTSPStreamPrivate *priv = stream->priv;
3224   GInetAddress *inetaddr;
3225   GSocketFamily family;
3226   GstRTSPAddress *mcast_addr;
3227
3228   /* Check if it's a ipv4 or ipv6 transport */
3229   inetaddr = g_inet_address_new_from_string (tr->destination);
3230   family = g_inet_address_get_family (inetaddr);
3231   g_object_unref (inetaddr);
3232
3233   /* Select fields corresponding to the family */
3234   if (family == G_SOCKET_FAMILY_IPV4) {
3235     mcast_addr = priv->mcast_addr_v4;
3236   } else {
3237     mcast_addr = priv->mcast_addr_v6;
3238   }
3239
3240   /* We support only one mcast group per family, make sure this transport
3241    * matches it. */
3242   if (!mcast_addr)
3243     goto no_addr;
3244
3245   if (g_ascii_strcasecmp (tr->destination, mcast_addr->address) != 0 ||
3246       tr->port.min != mcast_addr->port ||
3247       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
3248       tr->ttl != mcast_addr->ttl)
3249     goto wrong_addr;
3250
3251   return TRUE;
3252
3253 no_addr:
3254   {
3255     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
3256         "has been reserved");
3257     return FALSE;
3258   }
3259 wrong_addr:
3260   {
3261     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
3262         "the reserved address");
3263     return FALSE;
3264   }
3265 }
3266
3267 /**
3268  * gst_rtsp_stream_join_bin:
3269  * @stream: a #GstRTSPStream
3270  * @bin: (transfer none): a #GstBin to join
3271  * @rtpbin: (transfer none): a rtpbin element in @bin
3272  * @state: the target state of the new elements
3273  *
3274  * Join the #GstBin @bin that contains the element @rtpbin.
3275  *
3276  * @stream will link to @rtpbin, which must be inside @bin. The elements
3277  * added to @bin will be set to the state given in @state.
3278  *
3279  * Returns: %TRUE on success.
3280  */
3281 gboolean
3282 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
3283     GstElement * rtpbin, GstState state)
3284 {
3285   GstRTSPStreamPrivate *priv;
3286   guint idx;
3287   gchar *name;
3288   GstPadLinkReturn ret;
3289
3290   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3291   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3292   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3293
3294   priv = stream->priv;
3295
3296   g_mutex_lock (&priv->lock);
3297   if (priv->joined_bin != NULL)
3298     goto was_joined;
3299
3300   /* create a session with the same index as the stream */
3301   idx = priv->idx;
3302
3303   GST_INFO ("stream %p joining bin as session %u", stream, idx);
3304
3305   if (priv->profiles & GST_RTSP_PROFILE_SAVP
3306       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
3307     /* For SRTP */
3308     g_signal_connect (rtpbin, "request-rtp-encoder",
3309         (GCallback) request_rtp_encoder, stream);
3310     g_signal_connect (rtpbin, "request-rtcp-encoder",
3311         (GCallback) request_rtcp_encoder, stream);
3312     g_signal_connect (rtpbin, "request-rtp-decoder",
3313         (GCallback) request_rtp_rtcp_decoder, stream);
3314     g_signal_connect (rtpbin, "request-rtcp-decoder",
3315         (GCallback) request_rtp_rtcp_decoder, stream);
3316   }
3317
3318   if (priv->sinkpad) {
3319     g_signal_connect (rtpbin, "request-pt-map",
3320         (GCallback) request_pt_map, stream);
3321   }
3322
3323   /* get pads from the RTP session element for sending and receiving
3324    * RTP/RTCP*/
3325   if (priv->srcpad) {
3326     /* get a pad for sending RTP */
3327     name = g_strdup_printf ("send_rtp_sink_%u", idx);
3328     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
3329     g_free (name);
3330
3331     /* link the RTP pad to the session manager, it should not really fail unless
3332      * this is not really an RTP pad */
3333     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
3334     if (ret != GST_PAD_LINK_OK)
3335       goto link_failed;
3336
3337     name = g_strdup_printf ("send_rtp_src_%u", idx);
3338     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
3339     g_free (name);
3340   } else {
3341     /* RECORD case: need to connect our sinkpad from here */
3342     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
3343     /* EOS */
3344     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
3345
3346     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
3347     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
3348     g_free (name);
3349   }
3350
3351   name = g_strdup_printf ("send_rtcp_src_%u", idx);
3352   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
3353   g_free (name);
3354   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
3355   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
3356   g_free (name);
3357
3358   /* get the session */
3359   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
3360
3361   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
3362       stream);
3363   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
3364       stream);
3365   g_signal_connect (priv->session, "on-ssrc-active",
3366       (GCallback) on_ssrc_active, stream);
3367   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
3368       stream);
3369   g_signal_connect (priv->session, "on-bye-timeout",
3370       (GCallback) on_bye_timeout, stream);
3371   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
3372       stream);
3373
3374   /* signal for sender ssrc */
3375   g_signal_connect (priv->session, "on-new-sender-ssrc",
3376       (GCallback) on_new_sender_ssrc, stream);
3377   g_signal_connect (priv->session, "on-sender-ssrc-active",
3378       (GCallback) on_sender_ssrc_active, stream);
3379
3380   if (priv->srcpad) {
3381     /* be notified of caps changes */
3382     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
3383         (GCallback) caps_notify, stream);
3384     priv->caps = gst_pad_get_current_caps (priv->send_src[0]);
3385   }
3386
3387   priv->joined_bin = bin;
3388   GST_DEBUG_OBJECT (stream, "successfully joined bin");
3389   g_mutex_unlock (&priv->lock);
3390
3391   return TRUE;
3392
3393   /* ERRORS */
3394 was_joined:
3395   {
3396     g_mutex_unlock (&priv->lock);
3397     return TRUE;
3398   }
3399 link_failed:
3400   {
3401     GST_WARNING ("failed to link stream %u", idx);
3402     gst_object_unref (priv->send_rtp_sink);
3403     priv->send_rtp_sink = NULL;
3404     g_mutex_unlock (&priv->lock);
3405     return FALSE;
3406   }
3407 }
3408
3409 static void
3410 clear_element (GstBin * bin, GstElement ** elementptr)
3411 {
3412   if (*elementptr) {
3413     gst_element_set_locked_state (*elementptr, FALSE);
3414     gst_element_set_state (*elementptr, GST_STATE_NULL);
3415     if (GST_ELEMENT_PARENT (*elementptr))
3416       gst_bin_remove (bin, *elementptr);
3417     else
3418       gst_object_unref (*elementptr);
3419     *elementptr = NULL;
3420   }
3421 }
3422
3423 /**
3424  * gst_rtsp_stream_leave_bin:
3425  * @stream: a #GstRTSPStream
3426  * @bin: (transfer none): a #GstBin
3427  * @rtpbin: (transfer none): a rtpbin #GstElement
3428  *
3429  * Remove the elements of @stream from @bin.
3430  *
3431  * Return: %TRUE on success.
3432  */
3433 gboolean
3434 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
3435     GstElement * rtpbin)
3436 {
3437   GstRTSPStreamPrivate *priv;
3438   gint i;
3439
3440   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3441   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3442   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3443
3444   priv = stream->priv;
3445
3446   g_mutex_lock (&priv->lock);
3447   if (priv->joined_bin == NULL)
3448     goto was_not_joined;
3449   if (priv->joined_bin != bin)
3450     goto wrong_bin;
3451
3452   priv->joined_bin = NULL;
3453
3454   /* all transports must be removed by now */
3455   if (priv->transports != NULL)
3456     goto transports_not_removed;
3457
3458   clear_tr_cache (priv, TRUE);
3459   clear_tr_cache (priv, FALSE);
3460
3461   GST_INFO ("stream %p leaving bin", stream);
3462
3463   if (priv->srcpad) {
3464     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
3465
3466     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
3467     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
3468     gst_object_unref (priv->send_rtp_sink);
3469     priv->send_rtp_sink = NULL;
3470   } else if (priv->recv_rtp_src) {
3471     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
3472     gst_object_unref (priv->recv_rtp_src);
3473     priv->recv_rtp_src = NULL;
3474   }
3475
3476   for (i = 0; i < 2; i++) {
3477     clear_element (bin, &priv->udpsrc_v4[i]);
3478     clear_element (bin, &priv->udpsrc_v6[i]);
3479     clear_element (bin, &priv->udpqueue[i]);
3480     clear_element (bin, &priv->udpsink[i]);
3481
3482     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
3483     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
3484     clear_element (bin, &priv->mcast_udpqueue[i]);
3485     clear_element (bin, &priv->mcast_udpsink[i]);
3486
3487     clear_element (bin, &priv->appsrc[i]);
3488     clear_element (bin, &priv->appqueue[i]);
3489     clear_element (bin, &priv->appsink[i]);
3490
3491     clear_element (bin, &priv->tee[i]);
3492     clear_element (bin, &priv->funnel[i]);
3493
3494     if (priv->sinkpad || i == 1) {
3495       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3496       gst_object_unref (priv->recv_sink[i]);
3497       priv->recv_sink[i] = NULL;
3498     }
3499   }
3500
3501   if (priv->srcpad) {
3502     gst_object_unref (priv->send_src[0]);
3503     priv->send_src[0] = NULL;
3504   }
3505
3506   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3507   gst_object_unref (priv->send_src[1]);
3508   priv->send_src[1] = NULL;
3509
3510   g_object_unref (priv->session);
3511   priv->session = NULL;
3512   if (priv->caps)
3513     gst_caps_unref (priv->caps);
3514   priv->caps = NULL;
3515
3516   if (priv->srtpenc)
3517     gst_object_unref (priv->srtpenc);
3518   if (priv->srtpdec)
3519     gst_object_unref (priv->srtpdec);
3520
3521   if (priv->mcast_addr_v4)
3522     gst_rtsp_address_free (priv->mcast_addr_v4);
3523   priv->mcast_addr_v4 = NULL;
3524   if (priv->mcast_addr_v6)
3525     gst_rtsp_address_free (priv->mcast_addr_v6);
3526   priv->mcast_addr_v6 = NULL;
3527   if (priv->server_addr_v4)
3528     gst_rtsp_address_free (priv->server_addr_v4);
3529   priv->server_addr_v4 = NULL;
3530   if (priv->server_addr_v6)
3531     gst_rtsp_address_free (priv->server_addr_v6);
3532   priv->server_addr_v6 = NULL;
3533
3534   g_mutex_unlock (&priv->lock);
3535
3536   return TRUE;
3537
3538 was_not_joined:
3539   {
3540     g_mutex_unlock (&priv->lock);
3541     return TRUE;
3542   }
3543 transports_not_removed:
3544   {
3545     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3546     g_mutex_unlock (&priv->lock);
3547     return FALSE;
3548   }
3549 wrong_bin:
3550   {
3551     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
3552     g_mutex_unlock (&priv->lock);
3553     return FALSE;
3554   }
3555 }
3556
3557 /**
3558  * gst_rtsp_stream_get_joined_bin:
3559  * @stream: a #GstRTSPStream
3560  *
3561  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3562  *
3563  * Return: (transfer full) (nullable): the joined bin or NULL.
3564  */
3565 GstBin *
3566 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3567 {
3568   GstRTSPStreamPrivate *priv;
3569   GstBin *bin = NULL;
3570
3571   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3572
3573   priv = stream->priv;
3574
3575   g_mutex_lock (&priv->lock);
3576   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3577   g_mutex_unlock (&priv->lock);
3578
3579   return bin;
3580 }
3581
3582 /**
3583  * gst_rtsp_stream_get_rtpinfo:
3584  * @stream: a #GstRTSPStream
3585  * @rtptime: (allow-none) (out caller-allocates): result RTP timestamp
3586  * @seq: (allow-none) (out caller-allocates): result RTP seqnum
3587  * @clock_rate: (allow-none) (out caller-allocates): the clock rate
3588  * @running_time: (out caller-allocates): result running-time
3589  *
3590  * Retrieve the current rtptime, seq and running-time. This is used to
3591  * construct a RTPInfo reply header.
3592  *
3593  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3594  */
3595 gboolean
3596 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3597     guint * rtptime, guint * seq, guint * clock_rate,
3598     GstClockTime * running_time)
3599 {
3600   GstRTSPStreamPrivate *priv;
3601   GstStructure *stats;
3602   GObjectClass *payobjclass;
3603
3604   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3605
3606   priv = stream->priv;
3607
3608   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3609
3610   g_mutex_lock (&priv->lock);
3611
3612   /* First try to extract the information from the last buffer on the sinks.
3613    * This will have a more accurate sequence number and timestamp, as between
3614    * the payloader and the sink there can be some queues
3615    */
3616   if (priv->udpsink[0] || priv->appsink[0]) {
3617     GstSample *last_sample;
3618
3619     if (priv->udpsink[0])
3620       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3621     else
3622       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3623
3624     if (last_sample) {
3625       GstCaps *caps;
3626       GstBuffer *buffer;
3627       GstSegment *segment;
3628       GstStructure *s;
3629       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3630
3631       caps = gst_sample_get_caps (last_sample);
3632       buffer = gst_sample_get_buffer (last_sample);
3633       segment = gst_sample_get_segment (last_sample);
3634       s = gst_caps_get_structure (caps, 0);
3635
3636       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3637         guint ssrc_buf = gst_rtp_buffer_get_ssrc (&rtp_buffer);
3638         guint ssrc_stream = 0;
3639         if (gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) &&
3640             gst_structure_get_uint (s, "ssrc", &ssrc_stream) &&
3641             ssrc_buf != ssrc_stream) {
3642           /* Skip buffers from auxiliary streams. */
3643           GST_DEBUG_OBJECT (stream,
3644               "not a buffer from the payloader, SSRC: %08x", ssrc_buf);
3645
3646           gst_rtp_buffer_unmap (&rtp_buffer);
3647           gst_sample_unref (last_sample);
3648           goto stats;
3649         }
3650
3651         if (seq) {
3652           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3653         }
3654
3655         if (rtptime) {
3656           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3657         }
3658
3659         gst_rtp_buffer_unmap (&rtp_buffer);
3660
3661         if (running_time) {
3662           *running_time =
3663               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3664               GST_BUFFER_TIMESTAMP (buffer));
3665         }
3666
3667         if (clock_rate) {
3668           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3669
3670           if (*clock_rate == 0 && running_time)
3671             *running_time = GST_CLOCK_TIME_NONE;
3672         }
3673         gst_sample_unref (last_sample);
3674
3675         goto done;
3676       } else {
3677         gst_sample_unref (last_sample);
3678       }
3679     }
3680   }
3681
3682 stats:
3683   if (g_object_class_find_property (payobjclass, "stats")) {
3684     g_object_get (priv->payloader, "stats", &stats, NULL);
3685     if (stats == NULL)
3686       goto no_stats;
3687
3688     if (seq)
3689       gst_structure_get_uint (stats, "seqnum", seq);
3690
3691     if (rtptime)
3692       gst_structure_get_uint (stats, "timestamp", rtptime);
3693
3694     if (running_time)
3695       gst_structure_get_clock_time (stats, "running-time", running_time);
3696
3697     if (clock_rate) {
3698       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3699       if (*clock_rate == 0 && running_time)
3700         *running_time = GST_CLOCK_TIME_NONE;
3701     }
3702     gst_structure_free (stats);
3703   } else {
3704     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3705         !g_object_class_find_property (payobjclass, "timestamp"))
3706       goto no_stats;
3707
3708     if (seq)
3709       g_object_get (priv->payloader, "seqnum", seq, NULL);
3710
3711     if (rtptime)
3712       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3713
3714     if (running_time)
3715       *running_time = GST_CLOCK_TIME_NONE;
3716   }
3717
3718 done:
3719   g_mutex_unlock (&priv->lock);
3720
3721   return TRUE;
3722
3723   /* ERRORS */
3724 no_stats:
3725   {
3726     GST_WARNING ("Could not get payloader stats");
3727     g_mutex_unlock (&priv->lock);
3728     return FALSE;
3729   }
3730 }
3731
3732 /**
3733  * gst_rtsp_stream_get_caps:
3734  * @stream: a #GstRTSPStream
3735  *
3736  * Retrieve the current caps of @stream.
3737  *
3738  * Returns: (transfer full) (nullable): the #GstCaps of @stream.
3739  * use gst_caps_unref() after usage.
3740  */
3741 GstCaps *
3742 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3743 {
3744   GstRTSPStreamPrivate *priv;
3745   GstCaps *result;
3746
3747   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3748
3749   priv = stream->priv;
3750
3751   g_mutex_lock (&priv->lock);
3752   if ((result = priv->caps))
3753     gst_caps_ref (result);
3754   g_mutex_unlock (&priv->lock);
3755
3756   return result;
3757 }
3758
3759 /**
3760  * gst_rtsp_stream_recv_rtp:
3761  * @stream: a #GstRTSPStream
3762  * @buffer: (transfer full): a #GstBuffer
3763  *
3764  * Handle an RTP buffer for the stream. This method is usually called when a
3765  * message has been received from a client using the TCP transport.
3766  *
3767  * This function takes ownership of @buffer.
3768  *
3769  * Returns: a GstFlowReturn.
3770  */
3771 GstFlowReturn
3772 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3773 {
3774   GstRTSPStreamPrivate *priv;
3775   GstFlowReturn ret;
3776   GstElement *element;
3777
3778   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3779   priv = stream->priv;
3780   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3781   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3782
3783   g_mutex_lock (&priv->lock);
3784   if (priv->appsrc[0])
3785     element = gst_object_ref (priv->appsrc[0]);
3786   else
3787     element = NULL;
3788   g_mutex_unlock (&priv->lock);
3789
3790   if (element) {
3791     if (priv->appsrc_base_time[0] == -1) {
3792       /* Take current running_time. This timestamp will be put on
3793        * the first buffer of each stream because we are a live source and so we
3794        * timestamp with the running_time. When we are dealing with TCP, we also
3795        * only timestamp the first buffer (using the DISCONT flag) because a server
3796        * typically bursts data, for which we don't want to compensate by speeding
3797        * up the media. The other timestamps will be interpollated from this one
3798        * using the RTP timestamps. */
3799       GST_OBJECT_LOCK (element);
3800       if (GST_ELEMENT_CLOCK (element)) {
3801         GstClockTime now;
3802         GstClockTime base_time;
3803
3804         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3805         base_time = GST_ELEMENT_CAST (element)->base_time;
3806
3807         priv->appsrc_base_time[0] = now - base_time;
3808         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3809         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3810             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3811             GST_TIME_ARGS (base_time));
3812       }
3813       GST_OBJECT_UNLOCK (element);
3814     }
3815
3816     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3817     gst_object_unref (element);
3818   } else {
3819     ret = GST_FLOW_OK;
3820   }
3821   return ret;
3822 }
3823
3824 /**
3825  * gst_rtsp_stream_recv_rtcp:
3826  * @stream: a #GstRTSPStream
3827  * @buffer: (transfer full): a #GstBuffer
3828  *
3829  * Handle an RTCP buffer for the stream. This method is usually called when a
3830  * message has been received from a client using the TCP transport.
3831  *
3832  * This function takes ownership of @buffer.
3833  *
3834  * Returns: a GstFlowReturn.
3835  */
3836 GstFlowReturn
3837 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3838 {
3839   GstRTSPStreamPrivate *priv;
3840   GstFlowReturn ret;
3841   GstElement *element;
3842
3843   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3844   priv = stream->priv;
3845   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3846
3847   if (priv->joined_bin == NULL) {
3848     gst_buffer_unref (buffer);
3849     return GST_FLOW_NOT_LINKED;
3850   }
3851   g_mutex_lock (&priv->lock);
3852   if (priv->appsrc[1])
3853     element = gst_object_ref (priv->appsrc[1]);
3854   else
3855     element = NULL;
3856   g_mutex_unlock (&priv->lock);
3857
3858   if (element) {
3859     if (priv->appsrc_base_time[1] == -1) {
3860       /* Take current running_time. This timestamp will be put on
3861        * the first buffer of each stream because we are a live source and so we
3862        * timestamp with the running_time. When we are dealing with TCP, we also
3863        * only timestamp the first buffer (using the DISCONT flag) because a server
3864        * typically bursts data, for which we don't want to compensate by speeding
3865        * up the media. The other timestamps will be interpollated from this one
3866        * using the RTP timestamps. */
3867       GST_OBJECT_LOCK (element);
3868       if (GST_ELEMENT_CLOCK (element)) {
3869         GstClockTime now;
3870         GstClockTime base_time;
3871
3872         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3873         base_time = GST_ELEMENT_CAST (element)->base_time;
3874
3875         priv->appsrc_base_time[1] = now - base_time;
3876         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3877         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3878             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3879             GST_TIME_ARGS (base_time));
3880       }
3881       GST_OBJECT_UNLOCK (element);
3882     }
3883
3884     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3885     gst_object_unref (element);
3886   } else {
3887     ret = GST_FLOW_OK;
3888     gst_buffer_unref (buffer);
3889   }
3890   return ret;
3891 }
3892
3893 /* must be called with lock */
3894 static inline void
3895 add_client (GstElement * rtp_sink, GstElement * rtcp_sink, const gchar * host,
3896     gint rtp_port, gint rtcp_port)
3897 {
3898   if (rtp_sink != NULL)
3899     g_signal_emit_by_name (rtp_sink, "add", host, rtp_port, NULL);
3900   if (rtcp_sink != NULL)
3901     g_signal_emit_by_name (rtcp_sink, "add", host, rtcp_port, NULL);
3902 }
3903
3904 /* must be called with lock */
3905 static void
3906 remove_client (GstElement * rtp_sink, GstElement * rtcp_sink,
3907     const gchar * host, gint rtp_port, gint rtcp_port)
3908 {
3909   if (rtp_sink != NULL)
3910     g_signal_emit_by_name (rtp_sink, "remove", host, rtp_port, NULL);
3911   if (rtcp_sink != NULL)
3912     g_signal_emit_by_name (rtcp_sink, "remove", host, rtcp_port, NULL);
3913 }
3914
3915 /* must be called with lock */
3916 static gboolean
3917 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3918     gboolean add)
3919 {
3920   GstRTSPStreamPrivate *priv = stream->priv;
3921   const GstRTSPTransport *tr;
3922   gchar *dest;
3923   gint min, max;
3924
3925   tr = gst_rtsp_stream_transport_get_transport (trans);
3926   dest = tr->destination;
3927
3928   switch (tr->lower_transport) {
3929     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3930     {
3931       min = tr->port.min;
3932       max = tr->port.max;
3933
3934       if (add) {
3935         GST_INFO ("adding %s:%d-%d", dest, min, max);
3936         if (!check_mcast_part_for_transport (stream, tr))
3937           goto mcast_error;
3938
3939         /* FIXME: Is it ok to set ttl-mc if media is shared? */
3940         if (tr->ttl > 0) {
3941           GST_INFO ("setting ttl-mc %d", tr->ttl);
3942           if (priv->mcast_udpsink[0])
3943             g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "ttl-mc", tr->ttl,
3944                 NULL);
3945           if (priv->mcast_udpsink[1])
3946             g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "ttl-mc", tr->ttl,
3947                 NULL);
3948         }
3949         add_client (priv->mcast_udpsink[0], priv->mcast_udpsink[1], dest, min,
3950             max);
3951         priv->transports = g_list_prepend (priv->transports, trans);
3952       } else {
3953         GST_INFO ("removing %s:%d-%d", dest, min, max);
3954         remove_client (priv->mcast_udpsink[0], priv->mcast_udpsink[1], dest,
3955             min, max);
3956         priv->transports = g_list_remove (priv->transports, trans);
3957       }
3958       break;
3959     }
3960     case GST_RTSP_LOWER_TRANS_UDP:
3961     {
3962       if (priv->client_side) {
3963         /* In client side mode the 'destination' is the RTSP server, so send
3964          * to those ports */
3965         min = tr->server_port.min;
3966         max = tr->server_port.max;
3967       } else {
3968         min = tr->client_port.min;
3969         max = tr->client_port.max;
3970       }
3971
3972       if (add) {
3973         GST_INFO ("adding %s:%d-%d", dest, min, max);
3974         add_client (priv->udpsink[0], priv->udpsink[1], dest, min, max);
3975         priv->transports = g_list_prepend (priv->transports, trans);
3976       } else {
3977         GST_INFO ("removing %s:%d-%d", dest, min, max);
3978         remove_client (priv->udpsink[0], priv->udpsink[1], dest, min, max);
3979         priv->transports = g_list_remove (priv->transports, trans);
3980       }
3981       priv->transports_cookie++;
3982       break;
3983     }
3984     case GST_RTSP_LOWER_TRANS_TCP:
3985       if (add) {
3986         GST_INFO ("adding TCP %s", tr->destination);
3987         priv->transports = g_list_prepend (priv->transports, trans);
3988         priv->n_tcp_transports++;
3989       } else {
3990         GST_INFO ("removing TCP %s", tr->destination);
3991         priv->transports = g_list_remove (priv->transports, trans);
3992         priv->n_tcp_transports--;
3993       }
3994       priv->transports_cookie++;
3995       break;
3996     default:
3997       goto unknown_transport;
3998   }
3999   return TRUE;
4000
4001   /* ERRORS */
4002 unknown_transport:
4003   {
4004     GST_INFO ("Unknown transport %d", tr->lower_transport);
4005     return FALSE;
4006   }
4007 mcast_error:
4008   {
4009     return FALSE;
4010   }
4011 }
4012
4013 static void
4014 on_message_sent (gpointer user_data)
4015 {
4016   GstRTSPStream *stream = user_data;
4017   GstRTSPStreamPrivate *priv = stream->priv;
4018   gint idx = -1;
4019
4020   GST_DEBUG_OBJECT (stream, "message send complete");
4021
4022   g_mutex_lock (&priv->lock);
4023
4024   g_assert (priv->n_outstanding >= 0);
4025
4026   if (priv->n_outstanding == 0)
4027     goto no_outstanding;
4028
4029   priv->n_outstanding--;
4030   if (priv->n_outstanding == 0) {
4031     gint i;
4032
4033     /* iterate from 1 and down, so we prioritize RTCP over RTP */
4034     for (i = 1; i >= 0; i--) {
4035       if (priv->have_buffer[i]) {
4036         /* send message */
4037         idx = i;
4038         break;
4039       }
4040     }
4041   }
4042
4043   if (idx != -1)
4044     send_tcp_message (stream, idx);
4045
4046   g_mutex_unlock (&priv->lock);
4047
4048   return;
4049
4050   /* ERRORS */
4051 no_outstanding:
4052   {
4053     GST_INFO ("no outstanding messages");
4054     g_mutex_unlock (&priv->lock);
4055     return;
4056   }
4057 }
4058
4059 /**
4060  * gst_rtsp_stream_add_transport:
4061  * @stream: a #GstRTSPStream
4062  * @trans: (transfer none): a #GstRTSPStreamTransport
4063  *
4064  * Add the transport in @trans to @stream. The media of @stream will
4065  * then also be send to the values configured in @trans.
4066  *
4067  * @stream must be joined to a bin.
4068  *
4069  * @trans must contain a valid #GstRTSPTransport.
4070  *
4071  * Returns: %TRUE if @trans was added
4072  */
4073 gboolean
4074 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
4075     GstRTSPStreamTransport * trans)
4076 {
4077   GstRTSPStreamPrivate *priv;
4078   gboolean res;
4079
4080   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4081   priv = stream->priv;
4082   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
4083   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4084
4085   g_mutex_lock (&priv->lock);
4086   res = update_transport (stream, trans, TRUE);
4087   if (res)
4088     gst_rtsp_stream_transport_set_message_sent (trans, on_message_sent, stream,
4089         NULL);
4090   g_mutex_unlock (&priv->lock);
4091
4092   return res;
4093 }
4094
4095 /**
4096  * gst_rtsp_stream_remove_transport:
4097  * @stream: a #GstRTSPStream
4098  * @trans: (transfer none): a #GstRTSPStreamTransport
4099  *
4100  * Remove the transport in @trans from @stream. The media of @stream will
4101  * not be sent to the values configured in @trans.
4102  *
4103  * @stream must be joined to a bin.
4104  *
4105  * @trans must contain a valid #GstRTSPTransport.
4106  *
4107  * Returns: %TRUE if @trans was removed
4108  */
4109 gboolean
4110 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
4111     GstRTSPStreamTransport * trans)
4112 {
4113   GstRTSPStreamPrivate *priv;
4114   gboolean res;
4115
4116   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4117   priv = stream->priv;
4118   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
4119   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4120
4121   g_mutex_lock (&priv->lock);
4122   res = update_transport (stream, trans, FALSE);
4123   g_mutex_unlock (&priv->lock);
4124
4125   return res;
4126 }
4127
4128 /**
4129  * gst_rtsp_stream_update_crypto:
4130  * @stream: a #GstRTSPStream
4131  * @ssrc: the SSRC
4132  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
4133  *
4134  * Update the new crypto information for @ssrc in @stream. If information
4135  * for @ssrc did not exist, it will be added. If information
4136  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
4137  * be removed from @stream.
4138  *
4139  * Returns: %TRUE if @crypto could be updated
4140  */
4141 gboolean
4142 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
4143     guint ssrc, GstCaps * crypto)
4144 {
4145   GstRTSPStreamPrivate *priv;
4146
4147   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4148   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
4149
4150   priv = stream->priv;
4151
4152   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
4153
4154   g_mutex_lock (&priv->lock);
4155   if (crypto)
4156     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
4157         gst_caps_ref (crypto));
4158   else
4159     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
4160   g_mutex_unlock (&priv->lock);
4161
4162   return TRUE;
4163 }
4164
4165 /**
4166  * gst_rtsp_stream_get_rtp_socket:
4167  * @stream: a #GstRTSPStream
4168  * @family: the socket family
4169  *
4170  * Get the RTP socket from @stream for a @family.
4171  *
4172  * @stream must be joined to a bin.
4173  *
4174  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
4175  * socket could be allocated for @family. Unref after usage
4176  */
4177 GSocket *
4178 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
4179 {
4180   GstRTSPStreamPrivate *priv = stream->priv;
4181   GSocket *socket;
4182
4183   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4184   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4185       family == G_SOCKET_FAMILY_IPV6, NULL);
4186
4187   g_mutex_lock (&priv->lock);
4188   if (family == G_SOCKET_FAMILY_IPV6)
4189     socket = priv->socket_v6[0];
4190   else
4191     socket = priv->socket_v4[0];
4192
4193   if (socket != NULL)
4194     socket = g_object_ref (socket);
4195   g_mutex_unlock (&priv->lock);
4196
4197   return socket;
4198 }
4199
4200 /**
4201  * gst_rtsp_stream_get_rtcp_socket:
4202  * @stream: a #GstRTSPStream
4203  * @family: the socket family
4204  *
4205  * Get the RTCP socket from @stream for a @family.
4206  *
4207  * @stream must be joined to a bin.
4208  *
4209  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
4210  * socket could be allocated for @family. Unref after usage
4211  */
4212 GSocket *
4213 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
4214 {
4215   GstRTSPStreamPrivate *priv = stream->priv;
4216   GSocket *socket;
4217
4218   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4219   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4220       family == G_SOCKET_FAMILY_IPV6, NULL);
4221
4222   g_mutex_lock (&priv->lock);
4223   if (family == G_SOCKET_FAMILY_IPV6)
4224     socket = priv->socket_v6[1];
4225   else
4226     socket = priv->socket_v4[1];
4227
4228   if (socket != NULL)
4229     socket = g_object_ref (socket);
4230   g_mutex_unlock (&priv->lock);
4231
4232   return socket;
4233 }
4234
4235 /**
4236  * gst_rtsp_stream_get_rtp_multicast_socket:
4237  * @stream: a #GstRTSPStream
4238  * @family: the socket family
4239  *
4240  * Get the multicast RTP socket from @stream for a @family.
4241  *
4242  * Returns: (transfer full) (nullable): the multicast RTP socket or %NULL if no
4243  * socket could be allocated for @family. Unref after usage
4244  */
4245 GSocket *
4246 gst_rtsp_stream_get_rtp_multicast_socket (GstRTSPStream * stream,
4247     GSocketFamily family)
4248 {
4249   GstRTSPStreamPrivate *priv = stream->priv;
4250   GSocket *socket;
4251
4252   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4253   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4254       family == G_SOCKET_FAMILY_IPV6, NULL);
4255
4256   g_mutex_lock (&priv->lock);
4257   if (family == G_SOCKET_FAMILY_IPV6)
4258     socket = priv->mcast_socket_v6[0];
4259   else
4260     socket = priv->mcast_socket_v4[0];
4261
4262   if (socket != NULL)
4263     socket = g_object_ref (socket);
4264   g_mutex_unlock (&priv->lock);
4265
4266   return socket;
4267 }
4268
4269 /**
4270  * gst_rtsp_stream_get_rtcp_multicast_socket:
4271  * @stream: a #GstRTSPStream
4272  * @family: the socket family
4273  *
4274  * Get the multicast RTCP socket from @stream for a @family.
4275  *
4276  * Returns: (transfer full) (nullable): the multicast RTCP socket or %NULL if no
4277  * socket could be allocated for @family. Unref after usage
4278  */
4279 GSocket *
4280 gst_rtsp_stream_get_rtcp_multicast_socket (GstRTSPStream * stream,
4281     GSocketFamily family)
4282 {
4283   GstRTSPStreamPrivate *priv = stream->priv;
4284   GSocket *socket;
4285
4286   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4287   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4288       family == G_SOCKET_FAMILY_IPV6, NULL);
4289
4290   g_mutex_lock (&priv->lock);
4291   if (family == G_SOCKET_FAMILY_IPV6)
4292     socket = priv->mcast_socket_v6[1];
4293   else
4294     socket = priv->mcast_socket_v4[1];
4295
4296   if (socket != NULL)
4297     socket = g_object_ref (socket);
4298   g_mutex_unlock (&priv->lock);
4299
4300   return socket;
4301 }
4302
4303 /**
4304  * gst_rtsp_stream_set_seqnum:
4305  * @stream: a #GstRTSPStream
4306  * @seqnum: a new sequence number
4307  *
4308  * Configure the sequence number in the payloader of @stream to @seqnum.
4309  */
4310 void
4311 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
4312 {
4313   GstRTSPStreamPrivate *priv;
4314
4315   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
4316
4317   priv = stream->priv;
4318
4319   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
4320 }
4321
4322 /**
4323  * gst_rtsp_stream_get_seqnum:
4324  * @stream: a #GstRTSPStream
4325  *
4326  * Get the configured sequence number in the payloader of @stream.
4327  *
4328  * Returns: the sequence number of the payloader.
4329  */
4330 guint16
4331 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
4332 {
4333   GstRTSPStreamPrivate *priv;
4334   guint seqnum;
4335
4336   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
4337
4338   priv = stream->priv;
4339
4340   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
4341
4342   return seqnum;
4343 }
4344
4345 /**
4346  * gst_rtsp_stream_transport_filter:
4347  * @stream: a #GstRTSPStream
4348  * @func: (scope call) (allow-none): a callback
4349  * @user_data: (closure): user data passed to @func
4350  *
4351  * Call @func for each transport managed by @stream. The result value of @func
4352  * determines what happens to the transport. @func will be called with @stream
4353  * locked so no further actions on @stream can be performed from @func.
4354  *
4355  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
4356  * @stream.
4357  *
4358  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
4359  *
4360  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
4361  * will also be added with an additional ref to the result #GList of this
4362  * function..
4363  *
4364  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
4365  *
4366  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
4367  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
4368  * element in the #GList should be unreffed before the list is freed.
4369  */
4370 GList *
4371 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
4372     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
4373 {
4374   GstRTSPStreamPrivate *priv;
4375   GList *result, *walk, *next;
4376   GHashTable *visited = NULL;
4377   guint cookie;
4378
4379   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4380
4381   priv = stream->priv;
4382
4383   result = NULL;
4384   if (func)
4385     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
4386
4387   g_mutex_lock (&priv->lock);
4388 restart:
4389   cookie = priv->transports_cookie;
4390   for (walk = priv->transports; walk; walk = next) {
4391     GstRTSPStreamTransport *trans = walk->data;
4392     GstRTSPFilterResult res;
4393     gboolean changed;
4394
4395     next = g_list_next (walk);
4396
4397     if (func) {
4398       /* only visit each transport once */
4399       if (g_hash_table_contains (visited, trans))
4400         continue;
4401
4402       g_hash_table_add (visited, g_object_ref (trans));
4403       g_mutex_unlock (&priv->lock);
4404
4405       res = func (stream, trans, user_data);
4406
4407       g_mutex_lock (&priv->lock);
4408     } else
4409       res = GST_RTSP_FILTER_REF;
4410
4411     changed = (cookie != priv->transports_cookie);
4412
4413     switch (res) {
4414       case GST_RTSP_FILTER_REMOVE:
4415         update_transport (stream, trans, FALSE);
4416         break;
4417       case GST_RTSP_FILTER_REF:
4418         result = g_list_prepend (result, g_object_ref (trans));
4419         break;
4420       case GST_RTSP_FILTER_KEEP:
4421       default:
4422         break;
4423     }
4424     if (changed)
4425       goto restart;
4426   }
4427   g_mutex_unlock (&priv->lock);
4428
4429   if (func)
4430     g_hash_table_unref (visited);
4431
4432   return result;
4433 }
4434
4435 static GstPadProbeReturn
4436 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4437 {
4438   GstRTSPStreamPrivate *priv;
4439   GstRTSPStream *stream;
4440   GstBuffer *buffer = NULL;
4441
4442   stream = user_data;
4443   priv = stream->priv;
4444
4445   GST_DEBUG_OBJECT (pad, "now blocking");
4446
4447   g_mutex_lock (&priv->lock);
4448   priv->blocking = TRUE;
4449
4450   if ((info->type & GST_PAD_PROBE_TYPE_BUFFER)) {
4451     buffer = gst_pad_probe_info_get_buffer (info);
4452   } else if ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)) {
4453     GstBufferList *list = gst_pad_probe_info_get_buffer_list (info);
4454     buffer = gst_buffer_list_get (list, 0);
4455   } else {
4456     g_assert_not_reached ();
4457   }
4458
4459   g_assert (buffer);
4460   priv->position = GST_BUFFER_TIMESTAMP (buffer);
4461   GST_DEBUG_OBJECT (stream, "buffer position: %" GST_TIME_FORMAT,
4462       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
4463   g_mutex_unlock (&priv->lock);
4464
4465   gst_element_post_message (priv->payloader,
4466       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
4467           gst_structure_new_empty ("GstRTSPStreamBlocking")));
4468
4469   return GST_PAD_PROBE_OK;
4470 }
4471
4472 static void
4473 set_blocked (GstRTSPStream * stream, gboolean blocked)
4474 {
4475   GstRTSPStreamPrivate *priv;
4476   int i;
4477
4478   GST_DEBUG_OBJECT (stream, "blocked: %d", blocked);
4479
4480   priv = stream->priv;
4481
4482   if (blocked) {
4483     for (i = 0; i < 2; i++) {
4484       if (priv->blocked_id[i] != 0)
4485         continue;
4486       if (priv->send_src[i]) {
4487         priv->blocking = FALSE;
4488         priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
4489             GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
4490             GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
4491             g_object_ref (stream), g_object_unref);
4492       }
4493     }
4494   } else {
4495     for (i = 0; i < 2; i++) {
4496       if (priv->blocked_id[i] != 0) {
4497         gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]);
4498         priv->blocked_id[i] = 0;
4499       }
4500     }
4501     priv->blocking = FALSE;
4502   }
4503 }
4504
4505 /**
4506  * gst_rtsp_stream_set_blocked:
4507  * @stream: a #GstRTSPStream
4508  * @blocked: boolean indicating we should block or unblock
4509  *
4510  * Blocks or unblocks the dataflow on @stream.
4511  *
4512  * Returns: %TRUE on success
4513  */
4514 gboolean
4515 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
4516 {
4517   GstRTSPStreamPrivate *priv;
4518
4519   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4520
4521   priv = stream->priv;
4522   g_mutex_lock (&priv->lock);
4523   set_blocked (stream, blocked);
4524   g_mutex_unlock (&priv->lock);
4525
4526   return TRUE;
4527 }
4528
4529 /**
4530  * gst_rtsp_stream_ublock_linked:
4531  * @stream: a #GstRTSPStream
4532  *
4533  * Unblocks the dataflow on @stream if it is linked.
4534  *
4535  * Returns: %TRUE on success
4536  */
4537 gboolean
4538 gst_rtsp_stream_unblock_linked (GstRTSPStream * stream)
4539 {
4540   GstRTSPStreamPrivate *priv;
4541
4542   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4543
4544   priv = stream->priv;
4545   g_mutex_lock (&priv->lock);
4546   if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0]))
4547     set_blocked (stream, FALSE);
4548   g_mutex_unlock (&priv->lock);
4549
4550   return TRUE;
4551 }
4552
4553 /**
4554  * gst_rtsp_stream_is_blocking:
4555  * @stream: a #GstRTSPStream
4556  *
4557  * Check if @stream is blocking on a #GstBuffer.
4558  *
4559  * Returns: %TRUE if @stream is blocking
4560  */
4561 gboolean
4562 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
4563 {
4564   GstRTSPStreamPrivate *priv;
4565   gboolean result;
4566
4567   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4568
4569   priv = stream->priv;
4570
4571   g_mutex_lock (&priv->lock);
4572   result = priv->blocking;
4573   g_mutex_unlock (&priv->lock);
4574
4575   return result;
4576 }
4577
4578 /**
4579  * gst_rtsp_stream_query_position:
4580  * @stream: a #GstRTSPStream
4581  * @position: (out): current position of a #GstRTSPStream
4582  *
4583  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
4584  * the RTP parts of the pipeline and not the RTCP parts.
4585  *
4586  * Returns: %TRUE if the position could be queried
4587  */
4588 gboolean
4589 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
4590 {
4591   GstRTSPStreamPrivate *priv;
4592   GstElement *sink;
4593   GstPad *pad = NULL;
4594
4595   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4596
4597   /* query position: if no sinks have been added yet,
4598    * we obtain the position from the pad otherwise we query the sinks */
4599
4600   priv = stream->priv;
4601
4602   g_mutex_lock (&priv->lock);
4603   /* depending on the transport type, it should query corresponding sink */
4604   if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP)
4605     sink = priv->udpsink[0];
4606   else if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4607     sink = priv->mcast_udpsink[0];
4608   else
4609     sink = priv->appsink[0];
4610
4611   if (sink) {
4612     gst_object_ref (sink);
4613   } else if (priv->send_src[0]) {
4614     pad = gst_object_ref (priv->send_src[0]);
4615   } else {
4616     g_mutex_unlock (&priv->lock);
4617     GST_WARNING_OBJECT (stream, "Couldn't obtain postion: erroneous pipeline");
4618     return FALSE;
4619   }
4620   g_mutex_unlock (&priv->lock);
4621
4622   if (sink) {
4623     if (!gst_element_query_position (sink, GST_FORMAT_TIME, position)) {
4624       GST_WARNING_OBJECT (stream,
4625           "Couldn't obtain postion: position query failed");
4626       gst_object_unref (sink);
4627       return FALSE;
4628     }
4629     gst_object_unref (sink);
4630   } else if (pad) {
4631     GstEvent *event;
4632     const GstSegment *segment;
4633
4634     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4635     if (!event) {
4636       GST_WARNING_OBJECT (stream, "Couldn't obtain postion: no segment event");
4637       gst_object_unref (pad);
4638       return FALSE;
4639     }
4640
4641     gst_event_parse_segment (event, &segment);
4642     if (segment->format != GST_FORMAT_TIME) {
4643       *position = -1;
4644     } else {
4645       g_mutex_lock (&priv->lock);
4646       *position = priv->position;
4647       g_mutex_unlock (&priv->lock);
4648       *position =
4649           gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *position);
4650     }
4651     gst_event_unref (event);
4652     gst_object_unref (pad);
4653   }
4654
4655   return TRUE;
4656 }
4657
4658 /**
4659  * gst_rtsp_stream_query_stop:
4660  * @stream: a #GstRTSPStream
4661  * @stop: (out): current stop of a #GstRTSPStream
4662  *
4663  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
4664  * the RTP parts of the pipeline and not the RTCP parts.
4665  *
4666  * Returns: %TRUE if the stop could be queried
4667  */
4668 gboolean
4669 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
4670 {
4671   GstRTSPStreamPrivate *priv;
4672   GstElement *sink;
4673   GstPad *pad = NULL;
4674
4675   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4676
4677   /* query stop position: if no sinks have been added yet,
4678    * we obtain the stop position from the pad otherwise we query the sinks */
4679
4680   priv = stream->priv;
4681
4682   g_mutex_lock (&priv->lock);
4683   /* depending on the transport type, it should query corresponding sink */
4684   if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP)
4685     sink = priv->udpsink[0];
4686   else if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4687     sink = priv->mcast_udpsink[0];
4688   else
4689     sink = priv->appsink[0];
4690
4691   if (sink) {
4692     gst_object_ref (sink);
4693   } else if (priv->send_src[0]) {
4694     pad = gst_object_ref (priv->send_src[0]);
4695   } else {
4696     g_mutex_unlock (&priv->lock);
4697     GST_WARNING_OBJECT (stream, "Couldn't obtain stop: erroneous pipeline");
4698     return FALSE;
4699   }
4700   g_mutex_unlock (&priv->lock);
4701
4702   if (sink) {
4703     GstQuery *query;
4704     GstFormat format;
4705
4706     query = gst_query_new_segment (GST_FORMAT_TIME);
4707     if (!gst_element_query (sink, query)) {
4708       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: element query failed");
4709       gst_query_unref (query);
4710       gst_object_unref (sink);
4711       return FALSE;
4712     }
4713     gst_query_parse_segment (query, NULL, &format, NULL, stop);
4714     if (format != GST_FORMAT_TIME)
4715       *stop = -1;
4716     gst_query_unref (query);
4717     gst_object_unref (sink);
4718   } else if (pad) {
4719     GstEvent *event;
4720     const GstSegment *segment;
4721
4722     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4723     if (!event) {
4724       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: no segment event");
4725       gst_object_unref (pad);
4726       return FALSE;
4727     }
4728     gst_event_parse_segment (event, &segment);
4729     if (segment->format != GST_FORMAT_TIME) {
4730       *stop = -1;
4731     } else {
4732       *stop = segment->stop;
4733       if (*stop == -1)
4734         *stop = segment->duration;
4735       else
4736         *stop = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *stop);
4737     }
4738     gst_event_unref (event);
4739     gst_object_unref (pad);
4740   }
4741
4742   return TRUE;
4743 }
4744
4745 /**
4746  * gst_rtsp_stream_seekable:
4747  * @stream: a #GstRTSPStream
4748  *
4749  * Checks whether the individual @stream is seekable.
4750  *
4751  * Returns: %TRUE if @stream is seekable, else %FALSE.
4752  */
4753 gboolean
4754 gst_rtsp_stream_seekable (GstRTSPStream * stream)
4755 {
4756   GstRTSPStreamPrivate *priv;
4757   GstPad *pad = NULL;
4758   GstQuery *query = NULL;
4759   gboolean seekable = FALSE;
4760
4761   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4762
4763   /* query stop position: if no sinks have been added yet,
4764    * we obtain the stop position from the pad otherwise we query the sinks */
4765
4766   priv = stream->priv;
4767
4768   g_mutex_lock (&priv->lock);
4769   /* depending on the transport type, it should query corresponding sink */
4770   if (priv->srcpad) {
4771     pad = gst_object_ref (priv->srcpad);
4772   } else {
4773     g_mutex_unlock (&priv->lock);
4774     GST_WARNING_OBJECT (stream, "Pad not available, can't query seekability");
4775     goto beach;
4776   }
4777   g_mutex_unlock (&priv->lock);
4778
4779   query = gst_query_new_seeking (GST_FORMAT_TIME);
4780   if (!gst_pad_query (pad, query)) {
4781     GST_WARNING_OBJECT (stream, "seeking query failed");
4782     goto beach;
4783   }
4784   gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
4785
4786 beach:
4787   if (pad)
4788     gst_object_unref (pad);
4789   if (query)
4790     gst_query_unref (query);
4791
4792   GST_DEBUG_OBJECT (stream, "Returning %d", seekable);
4793
4794   return seekable;
4795 }
4796
4797 /**
4798  * gst_rtsp_stream_complete_stream:
4799  * @stream: a #GstRTSPStream
4800  * @transport: a #GstRTSPTransport
4801  *
4802  * Add a receiver and sender part to the pipeline based on the transport from
4803  * SETUP.
4804  *
4805  * Returns: %TRUE if the stream has been sucessfully updated.
4806  */
4807 gboolean
4808 gst_rtsp_stream_complete_stream (GstRTSPStream * stream,
4809     const GstRTSPTransport * transport)
4810 {
4811   GstRTSPStreamPrivate *priv;
4812
4813   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4814
4815   priv = stream->priv;
4816   GST_DEBUG_OBJECT (stream, "complete stream");
4817
4818   g_mutex_lock (&priv->lock);
4819
4820   if (!(priv->allowed_protocols & transport->lower_transport))
4821     goto unallowed_transport;
4822
4823   if (!create_receiver_part (stream, transport))
4824     goto create_receiver_error;
4825
4826   /* in the RECORD case, we only add RTCP sender part */
4827   if (!create_sender_part (stream, transport))
4828     goto create_sender_error;
4829
4830   priv->configured_protocols |= transport->lower_transport;
4831
4832   priv->is_complete = TRUE;
4833   g_mutex_unlock (&priv->lock);
4834
4835   GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated");
4836   return TRUE;
4837
4838 create_receiver_error:
4839 create_sender_error:
4840 unallowed_transport:
4841   {
4842     g_mutex_unlock (&priv->lock);
4843     return FALSE;
4844   }
4845 }
4846
4847 /**
4848  * gst_rtsp_stream_is_complete:
4849  * @stream: a #GstRTSPStream
4850  *
4851  * Checks whether the stream is complete, contains the receiver and the sender
4852  * parts. As the stream contains sink(s) element(s), it's possible to perform
4853  * seek operations on it.
4854  *
4855  * Returns: %TRUE if the stream contains at least one sink element.
4856  */
4857 gboolean
4858 gst_rtsp_stream_is_complete (GstRTSPStream * stream)
4859 {
4860   GstRTSPStreamPrivate *priv;
4861   gboolean ret = FALSE;
4862
4863   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4864
4865   priv = stream->priv;
4866   g_mutex_lock (&priv->lock);
4867   ret = priv->is_complete;
4868   g_mutex_unlock (&priv->lock);
4869
4870   return ret;
4871 }
4872
4873 /**
4874  * gst_rtsp_stream_is_sender:
4875  * @stream: a #GstRTSPStream
4876  *
4877  * Checks whether the stream is a sender.
4878  *
4879  * Returns: %TRUE if the stream is a sender and %FALSE otherwise.
4880  */
4881 gboolean
4882 gst_rtsp_stream_is_sender (GstRTSPStream * stream)
4883 {
4884   GstRTSPStreamPrivate *priv;
4885   gboolean ret = FALSE;
4886
4887   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4888
4889   priv = stream->priv;
4890   g_mutex_lock (&priv->lock);
4891   ret = (priv->srcpad != NULL);
4892   g_mutex_unlock (&priv->lock);
4893
4894   return ret;
4895 }
4896
4897 /**
4898  * gst_rtsp_stream_is_receiver:
4899  * @stream: a #GstRTSPStream
4900  *
4901  * Checks whether the stream is a receiver.
4902  *
4903  * Returns: %TRUE if the stream is a receiver and %FALSE otherwise.
4904  */
4905 gboolean
4906 gst_rtsp_stream_is_receiver (GstRTSPStream * stream)
4907 {
4908   GstRTSPStreamPrivate *priv;
4909   gboolean ret = FALSE;
4910
4911   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4912
4913   priv = stream->priv;
4914   g_mutex_lock (&priv->lock);
4915   ret = (priv->sinkpad != NULL);
4916   g_mutex_unlock (&priv->lock);
4917
4918   return ret;
4919 }
4920
4921 #define AES_128_KEY_LEN 16
4922 #define AES_256_KEY_LEN 32
4923
4924 #define HMAC_32_KEY_LEN 4
4925 #define HMAC_80_KEY_LEN 10
4926
4927 static gboolean
4928 mikey_apply_policy (GstCaps * caps, GstMIKEYMessage * msg, guint8 policy)
4929 {
4930   const gchar *srtp_cipher;
4931   const gchar *srtp_auth;
4932   const GstMIKEYPayload *sp;
4933   guint i;
4934
4935   /* loop over Security policy until we find one containing policy */
4936   for (i = 0;; i++) {
4937     if ((sp = gst_mikey_message_find_payload (msg, GST_MIKEY_PT_SP, i)) == NULL)
4938       break;
4939
4940     if (((GstMIKEYPayloadSP *) sp)->policy == policy)
4941       break;
4942   }
4943
4944   /* the default ciphers */
4945   srtp_cipher = "aes-128-icm";
4946   srtp_auth = "hmac-sha1-80";
4947
4948   /* now override the defaults with what is in the Security Policy */
4949   if (sp != NULL) {
4950     guint len;
4951
4952     /* collect all the params and go over them */
4953     len = gst_mikey_payload_sp_get_n_params (sp);
4954     for (i = 0; i < len; i++) {
4955       const GstMIKEYPayloadSPParam *param =
4956           gst_mikey_payload_sp_get_param (sp, i);
4957
4958       switch (param->type) {
4959         case GST_MIKEY_SP_SRTP_ENC_ALG:
4960           switch (param->val[0]) {
4961             case 0:
4962               srtp_cipher = "null";
4963               break;
4964             case 2:
4965             case 1:
4966               srtp_cipher = "aes-128-icm";
4967               break;
4968             default:
4969               break;
4970           }
4971           break;
4972         case GST_MIKEY_SP_SRTP_ENC_KEY_LEN:
4973           switch (param->val[0]) {
4974             case AES_128_KEY_LEN:
4975               srtp_cipher = "aes-128-icm";
4976               break;
4977             case AES_256_KEY_LEN:
4978               srtp_cipher = "aes-256-icm";
4979               break;
4980             default:
4981               break;
4982           }
4983           break;
4984         case GST_MIKEY_SP_SRTP_AUTH_ALG:
4985           switch (param->val[0]) {
4986             case 0:
4987               srtp_auth = "null";
4988               break;
4989             case 2:
4990             case 1:
4991               srtp_auth = "hmac-sha1-80";
4992               break;
4993             default:
4994               break;
4995           }
4996           break;
4997         case GST_MIKEY_SP_SRTP_AUTH_KEY_LEN:
4998           switch (param->val[0]) {
4999             case HMAC_32_KEY_LEN:
5000               srtp_auth = "hmac-sha1-32";
5001               break;
5002             case HMAC_80_KEY_LEN:
5003               srtp_auth = "hmac-sha1-80";
5004               break;
5005             default:
5006               break;
5007           }
5008           break;
5009         case GST_MIKEY_SP_SRTP_SRTP_ENC:
5010           break;
5011         case GST_MIKEY_SP_SRTP_SRTCP_ENC:
5012           break;
5013         default:
5014           break;
5015       }
5016     }
5017   }
5018   /* now configure the SRTP parameters */
5019   gst_caps_set_simple (caps,
5020       "srtp-cipher", G_TYPE_STRING, srtp_cipher,
5021       "srtp-auth", G_TYPE_STRING, srtp_auth,
5022       "srtcp-cipher", G_TYPE_STRING, srtp_cipher,
5023       "srtcp-auth", G_TYPE_STRING, srtp_auth, NULL);
5024
5025   return TRUE;
5026 }
5027
5028 static gboolean
5029 handle_mikey_data (GstRTSPStream * stream, guint8 * data, gsize size)
5030 {
5031   GstMIKEYMessage *msg;
5032   guint i, n_cs;
5033   GstCaps *caps = NULL;
5034   GstMIKEYPayloadKEMAC *kemac;
5035   const GstMIKEYPayloadKeyData *pkd;
5036   GstBuffer *key;
5037
5038   /* the MIKEY message contains a CSB or crypto session bundle. It is a
5039    * set of Crypto Sessions protected with the same master key.
5040    * In the context of SRTP, an RTP and its RTCP stream is part of a
5041    * crypto session */
5042   if ((msg = gst_mikey_message_new_from_data (data, size, NULL, NULL)) == NULL)
5043     goto parse_failed;
5044
5045   /* we can only handle SRTP crypto sessions for now */
5046   if (msg->map_type != GST_MIKEY_MAP_TYPE_SRTP)
5047     goto invalid_map_type;
5048
5049   /* get the number of crypto sessions. This maps SSRC to its
5050    * security parameters */
5051   n_cs = gst_mikey_message_get_n_cs (msg);
5052   if (n_cs == 0)
5053     goto no_crypto_sessions;
5054
5055   /* we also need keys */
5056   if (!(kemac = (GstMIKEYPayloadKEMAC *) gst_mikey_message_find_payload
5057           (msg, GST_MIKEY_PT_KEMAC, 0)))
5058     goto no_keys;
5059
5060   /* we don't support encrypted keys */
5061   if (kemac->enc_alg != GST_MIKEY_ENC_NULL
5062       || kemac->mac_alg != GST_MIKEY_MAC_NULL)
5063     goto unsupported_encryption;
5064
5065   /* get Key data sub-payload */
5066   pkd = (const GstMIKEYPayloadKeyData *)
5067       gst_mikey_payload_kemac_get_sub (&kemac->pt, 0);
5068
5069   key =
5070       gst_buffer_new_wrapped (g_memdup (pkd->key_data, pkd->key_len),
5071       pkd->key_len);
5072
5073   /* go over all crypto sessions and create the security policy for each
5074    * SSRC */
5075   for (i = 0; i < n_cs; i++) {
5076     const GstMIKEYMapSRTP *map = gst_mikey_message_get_cs_srtp (msg, i);
5077
5078     caps = gst_caps_new_simple ("application/x-srtp",
5079         "ssrc", G_TYPE_UINT, map->ssrc,
5080         "roc", G_TYPE_UINT, map->roc, "srtp-key", GST_TYPE_BUFFER, key, NULL);
5081     mikey_apply_policy (caps, msg, map->policy);
5082
5083     gst_rtsp_stream_update_crypto (stream, map->ssrc, caps);
5084     gst_caps_unref (caps);
5085   }
5086   gst_mikey_message_unref (msg);
5087   gst_buffer_unref (key);
5088
5089   return TRUE;
5090
5091   /* ERRORS */
5092 parse_failed:
5093   {
5094     GST_DEBUG_OBJECT (stream, "failed to parse MIKEY message");
5095     return FALSE;
5096   }
5097 invalid_map_type:
5098   {
5099     GST_DEBUG_OBJECT (stream, "invalid map type %d", msg->map_type);
5100     goto cleanup_message;
5101   }
5102 no_crypto_sessions:
5103   {
5104     GST_DEBUG_OBJECT (stream, "no crypto sessions");
5105     goto cleanup_message;
5106   }
5107 no_keys:
5108   {
5109     GST_DEBUG_OBJECT (stream, "no keys found");
5110     goto cleanup_message;
5111   }
5112 unsupported_encryption:
5113   {
5114     GST_DEBUG_OBJECT (stream, "unsupported key encryption");
5115     goto cleanup_message;
5116   }
5117 cleanup_message:
5118   {
5119     gst_mikey_message_unref (msg);
5120     return FALSE;
5121   }
5122 }
5123
5124 #define IS_STRIP_CHAR(c) (g_ascii_isspace ((guchar)(c)) || ((c) == '\"'))
5125
5126 static void
5127 strip_chars (gchar * str)
5128 {
5129   gchar *s;
5130   gsize len;
5131
5132   len = strlen (str);
5133   while (len--) {
5134     if (!IS_STRIP_CHAR (str[len]))
5135       break;
5136     str[len] = '\0';
5137   }
5138   for (s = str; *s && IS_STRIP_CHAR (*s); s++);
5139   memmove (str, s, len + 1);
5140 }
5141
5142 /**
5143  * gst_rtsp_stream_handle_keymgmt:
5144  * @stream: a #GstRTSPStream
5145  * @keymgmt: a keymgmt header
5146  *
5147  * Parse and handle a KeyMgmt header.
5148  *
5149  * Since: 1.16
5150  */
5151 /* KeyMgmt = "KeyMgmt" ":" key-mgmt-spec 0*("," key-mgmt-spec)
5152  * key-mgmt-spec = "prot" "=" KMPID ";" ["uri" "=" %x22 URI %x22 ";"]
5153  */
5154 gboolean
5155 gst_rtsp_stream_handle_keymgmt (GstRTSPStream * stream, const gchar * keymgmt)
5156 {
5157   gchar **specs;
5158   gint i, j;
5159
5160   specs = g_strsplit (keymgmt, ",", 0);
5161   for (i = 0; specs[i]; i++) {
5162     gchar **split;
5163
5164     split = g_strsplit (specs[i], ";", 0);
5165     for (j = 0; split[j]; j++) {
5166       g_strstrip (split[j]);
5167       if (g_str_has_prefix (split[j], "prot=")) {
5168         g_strstrip (split[j] + 5);
5169         if (!g_str_equal (split[j] + 5, "mikey"))
5170           break;
5171         GST_DEBUG ("found mikey");
5172       } else if (g_str_has_prefix (split[j], "uri=")) {
5173         strip_chars (split[j] + 4);
5174         GST_DEBUG ("found uri '%s'", split[j] + 4);
5175       } else if (g_str_has_prefix (split[j], "data=")) {
5176         guchar *data;
5177         gsize size;
5178         strip_chars (split[j] + 5);
5179         GST_DEBUG ("found data '%s'", split[j] + 5);
5180         data = g_base64_decode_inplace (split[j] + 5, &size);
5181         handle_mikey_data (stream, data, size);
5182       }
5183     }
5184     g_strfreev (split);
5185   }
5186   g_strfreev (specs);
5187   return TRUE;
5188 }
5189
5190
5191 /**
5192  * gst_rtsp_stream_get_ulpfec_pt:
5193  *
5194  * Returns: the payload type used for ULPFEC protection packets
5195  *
5196  * Since: 1.16
5197  */
5198 guint
5199 gst_rtsp_stream_get_ulpfec_pt (GstRTSPStream * stream)
5200 {
5201   guint res;
5202
5203   g_mutex_lock (&stream->priv->lock);
5204   res = stream->priv->ulpfec_pt;
5205   g_mutex_unlock (&stream->priv->lock);
5206
5207   return res;
5208 }
5209
5210 /**
5211  * gst_rtsp_stream_set_ulpfec_pt:
5212  *
5213  * Set the payload type to be used for ULPFEC protection packets
5214  *
5215  * Since: 1.16
5216  */
5217 void
5218 gst_rtsp_stream_set_ulpfec_pt (GstRTSPStream * stream, guint pt)
5219 {
5220   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5221
5222   g_mutex_lock (&stream->priv->lock);
5223   stream->priv->ulpfec_pt = pt;
5224   if (stream->priv->ulpfec_encoder) {
5225     g_object_set (stream->priv->ulpfec_encoder, "pt", pt, NULL);
5226   }
5227   g_mutex_unlock (&stream->priv->lock);
5228 }
5229
5230 /**
5231  * gst_rtsp_stream_request_ulpfec_decoder:
5232  *
5233  * Creating a rtpulpfecdec element
5234  *
5235  * Returns: (transfer full) (nullable): a #GstElement.
5236  *
5237  * Since: 1.16
5238  */
5239 GstElement *
5240 gst_rtsp_stream_request_ulpfec_decoder (GstRTSPStream * stream,
5241     GstElement * rtpbin, guint sessid)
5242 {
5243   GObject *internal_storage = NULL;
5244
5245   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5246   stream->priv->ulpfec_decoder =
5247       gst_object_ref (gst_element_factory_make ("rtpulpfecdec", NULL));
5248
5249   g_signal_emit_by_name (G_OBJECT (rtpbin), "get-internal-storage", sessid,
5250       &internal_storage);
5251   g_object_set (stream->priv->ulpfec_decoder, "storage", internal_storage,
5252       NULL);
5253   g_object_unref (internal_storage);
5254   update_ulpfec_decoder_pt (stream);
5255
5256   return stream->priv->ulpfec_decoder;
5257 }
5258
5259 /**
5260  * gst_rtsp_stream_request_ulpfec_encoder:
5261  *
5262  * Creating a rtpulpfecenc element
5263  *
5264  * Returns: (transfer full) (nullable): a #GstElement.
5265  *
5266  * Since: 1.16
5267  */
5268 GstElement *
5269 gst_rtsp_stream_request_ulpfec_encoder (GstRTSPStream * stream, guint sessid)
5270 {
5271   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5272
5273   if (!stream->priv->ulpfec_percentage)
5274     return NULL;
5275
5276   stream->priv->ulpfec_encoder =
5277       gst_object_ref (gst_element_factory_make ("rtpulpfecenc", NULL));
5278
5279   g_object_set (stream->priv->ulpfec_encoder, "pt", stream->priv->ulpfec_pt,
5280       "percentage", stream->priv->ulpfec_percentage, NULL);
5281
5282   return stream->priv->ulpfec_encoder;
5283 }
5284
5285 /**
5286  * gst_rtsp_stream_set_ulpfec_percentage:
5287  *
5288  * Sets the amount of redundancy to apply when creating ULPFEC
5289  * protection packets.
5290  *
5291  * Since: 1.16
5292  */
5293 void
5294 gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream * stream, guint percentage)
5295 {
5296   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5297
5298   g_mutex_lock (&stream->priv->lock);
5299   stream->priv->ulpfec_percentage = percentage;
5300   if (stream->priv->ulpfec_encoder) {
5301     g_object_set (stream->priv->ulpfec_encoder, "percentage", percentage, NULL);
5302   }
5303   g_mutex_unlock (&stream->priv->lock);
5304 }
5305
5306 /**
5307  * gst_rtsp_stream_get_ulpfec_percentage:
5308  *
5309  * Returns: the amount of redundancy applied when creating ULPFEC
5310  * protection packets.
5311  *
5312  * Since: 1.16
5313  */
5314 guint
5315 gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
5316 {
5317   guint res;
5318
5319   g_mutex_lock (&stream->priv->lock);
5320   res = stream->priv->ulpfec_percentage;
5321   g_mutex_unlock (&stream->priv->lock);
5322
5323   return res;
5324 }