rtsp-stream: Set the multicast TTL parameter on multicast udp sinks
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 struct _GstRTSPStreamPrivate
63 {
64   GMutex lock;
65   guint idx;
66   /* Only one pad is ever set */
67   GstPad *srcpad, *sinkpad;
68   GstElement *payloader;
69   guint buffer_size;
70   GstBin *joined_bin;
71
72   /* TRUE if this stream is running on
73    * the client side of an RTSP link (for RECORD) */
74   gboolean client_side;
75   gchar *control;
76
77   /* TRUE if stream is complete. This means that the receiver and the sender
78    * parts are present in the stream. */
79   gboolean is_complete;
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* for UDP unicast */
98   GstElement *udpsrc_v4[2];
99   GstElement *udpsrc_v6[2];
100   GstElement *udpqueue[2];
101   GstElement *udpsink[2];
102   GSocket *socket_v4[2];
103   GSocket *socket_v6[2];
104
105   /* for UDP multicast */
106   GstElement *mcast_udpsrc_v4[2];
107   GstElement *mcast_udpsrc_v6[2];
108   GstElement *mcast_udpqueue[2];
109   GstElement *mcast_udpsink[2];
110   GSocket *mcast_socket_v4[2];
111   GSocket *mcast_socket_v6[2];
112
113   /* for TCP transport */
114   GstElement *appsrc[2];
115   GstClockTime appsrc_base_time[2];
116   GstElement *appqueue[2];
117   GstElement *appsink[2];
118
119   GstElement *tee[2];
120   GstElement *funnel[2];
121
122   /* retransmission */
123   GstElement *rtxsend;
124   GstElement *rtxreceive;
125   guint rtx_pt;
126   GstClockTime rtx_time;
127
128   /* Forward Error Correction with RFC 5109 */
129   GstElement *ulpfec_decoder;
130   GstElement *ulpfec_encoder;
131   guint ulpfec_pt;
132   gboolean ulpfec_enabled;
133   guint ulpfec_percentage;
134
135   /* pool used to manage unicast and multicast addresses */
136   GstRTSPAddressPool *pool;
137
138   /* unicast server addr/port */
139   GstRTSPAddress *server_addr_v4;
140   GstRTSPAddress *server_addr_v6;
141
142   /* multicast addresses */
143   GstRTSPAddress *mcast_addr_v4;
144   GstRTSPAddress *mcast_addr_v6;
145
146   gchar *multicast_iface;
147
148   /* the caps of the stream */
149   gulong caps_sig;
150   GstCaps *caps;
151
152   /* transports we stream to */
153   guint n_active;
154   GList *transports;
155   guint transports_cookie;
156   GList *tr_cache_rtp;
157   GList *tr_cache_rtcp;
158   guint tr_cache_cookie_rtp;
159   guint tr_cache_cookie_rtcp;
160
161   gint dscp_qos;
162
163   /* stream blocking */
164   gulong blocked_id[2];
165   gboolean blocking;
166
167   /* current stream postion */
168   GstClockTime position;
169
170   /* pt->caps map for RECORD streams */
171   GHashTable *ptmap;
172
173   GstRTSPPublishClockMode publish_clock_mode;
174 };
175
176 #define DEFAULT_CONTROL         NULL
177 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
178 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
179                                         GST_RTSP_LOWER_TRANS_TCP
180
181 enum
182 {
183   PROP_0,
184   PROP_CONTROL,
185   PROP_PROFILES,
186   PROP_PROTOCOLS,
187   PROP_LAST
188 };
189
190 enum
191 {
192   SIGNAL_NEW_RTP_ENCODER,
193   SIGNAL_NEW_RTCP_ENCODER,
194   SIGNAL_LAST
195 };
196
197 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
198 #define GST_CAT_DEFAULT rtsp_stream_debug
199
200 static GQuark ssrc_stream_map_key;
201
202 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
203     GValue * value, GParamSpec * pspec);
204 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
205     const GValue * value, GParamSpec * pspec);
206
207 static void gst_rtsp_stream_finalize (GObject * obj);
208
209 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
210
211 G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
212
213 static void
214 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
215 {
216   GObjectClass *gobject_class;
217
218   gobject_class = G_OBJECT_CLASS (klass);
219
220   gobject_class->get_property = gst_rtsp_stream_get_property;
221   gobject_class->set_property = gst_rtsp_stream_set_property;
222   gobject_class->finalize = gst_rtsp_stream_finalize;
223
224   g_object_class_install_property (gobject_class, PROP_CONTROL,
225       g_param_spec_string ("control", "Control",
226           "The control string for this stream", DEFAULT_CONTROL,
227           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228
229   g_object_class_install_property (gobject_class, PROP_PROFILES,
230       g_param_spec_flags ("profiles", "Profiles",
231           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
232           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
233
234   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
235       g_param_spec_flags ("protocols", "Protocols",
236           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
237           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
238
239   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
240       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
241       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
242       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
243
244   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
245       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
246       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
247       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
248
249   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
250
251   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
252 }
253
254 static void
255 gst_rtsp_stream_init (GstRTSPStream * stream)
256 {
257   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
258
259   GST_DEBUG ("new stream %p", stream);
260
261   stream->priv = priv;
262
263   priv->dscp_qos = -1;
264   priv->control = g_strdup (DEFAULT_CONTROL);
265   priv->profiles = DEFAULT_PROFILES;
266   priv->protocols = DEFAULT_PROTOCOLS;
267   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
268
269   g_mutex_init (&priv->lock);
270
271   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
272       NULL, (GDestroyNotify) gst_caps_unref);
273   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
274       (GDestroyNotify) gst_caps_unref);
275 }
276
277 static void
278 gst_rtsp_stream_finalize (GObject * obj)
279 {
280   GstRTSPStream *stream;
281   GstRTSPStreamPrivate *priv;
282   guint i;
283
284   stream = GST_RTSP_STREAM (obj);
285   priv = stream->priv;
286
287   GST_DEBUG ("finalize stream %p", stream);
288
289   /* we really need to be unjoined now */
290   g_return_if_fail (priv->joined_bin == NULL);
291
292   if (priv->mcast_addr_v4)
293     gst_rtsp_address_free (priv->mcast_addr_v4);
294   if (priv->mcast_addr_v6)
295     gst_rtsp_address_free (priv->mcast_addr_v6);
296   if (priv->server_addr_v4)
297     gst_rtsp_address_free (priv->server_addr_v4);
298   if (priv->server_addr_v6)
299     gst_rtsp_address_free (priv->server_addr_v6);
300   if (priv->pool)
301     g_object_unref (priv->pool);
302   if (priv->rtxsend)
303     g_object_unref (priv->rtxsend);
304   if (priv->rtxreceive)
305     g_object_unref (priv->rtxreceive);
306   if (priv->ulpfec_encoder)
307     gst_object_unref (priv->ulpfec_encoder);
308   if (priv->ulpfec_decoder)
309     gst_object_unref (priv->ulpfec_decoder);
310
311   for (i = 0; i < 2; i++) {
312     if (priv->socket_v4[i])
313       g_object_unref (priv->socket_v4[i]);
314     if (priv->socket_v6[i])
315       g_object_unref (priv->socket_v6[i]);
316     if (priv->mcast_socket_v4[i])
317       g_object_unref (priv->mcast_socket_v4[i]);
318     if (priv->mcast_socket_v6[i])
319       g_object_unref (priv->mcast_socket_v6[i]);
320   }
321
322   g_free (priv->multicast_iface);
323
324   gst_object_unref (priv->payloader);
325   if (priv->srcpad)
326     gst_object_unref (priv->srcpad);
327   if (priv->sinkpad)
328     gst_object_unref (priv->sinkpad);
329   g_free (priv->control);
330   g_mutex_clear (&priv->lock);
331
332   g_hash_table_unref (priv->keys);
333   g_hash_table_destroy (priv->ptmap);
334
335   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
336 }
337
338 static void
339 gst_rtsp_stream_get_property (GObject * object, guint propid,
340     GValue * value, GParamSpec * pspec)
341 {
342   GstRTSPStream *stream = GST_RTSP_STREAM (object);
343
344   switch (propid) {
345     case PROP_CONTROL:
346       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
347       break;
348     case PROP_PROFILES:
349       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
350       break;
351     case PROP_PROTOCOLS:
352       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
353       break;
354     default:
355       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
356   }
357 }
358
359 static void
360 gst_rtsp_stream_set_property (GObject * object, guint propid,
361     const GValue * value, GParamSpec * pspec)
362 {
363   GstRTSPStream *stream = GST_RTSP_STREAM (object);
364
365   switch (propid) {
366     case PROP_CONTROL:
367       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
368       break;
369     case PROP_PROFILES:
370       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
371       break;
372     case PROP_PROTOCOLS:
373       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
374       break;
375     default:
376       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
377   }
378 }
379
380 /**
381  * gst_rtsp_stream_new:
382  * @idx: an index
383  * @pad: a #GstPad
384  * @payloader: a #GstElement
385  *
386  * Create a new media stream with index @idx that handles RTP data on
387  * @pad and has a payloader element @payloader if @pad is a source pad
388  * or a depayloader element @payloader if @pad is a sink pad.
389  *
390  * Returns: (transfer full): a new #GstRTSPStream
391  */
392 GstRTSPStream *
393 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
394 {
395   GstRTSPStreamPrivate *priv;
396   GstRTSPStream *stream;
397
398   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
399   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
400
401   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
402   priv = stream->priv;
403   priv->idx = idx;
404   priv->payloader = gst_object_ref (payloader);
405   if (GST_PAD_IS_SRC (pad))
406     priv->srcpad = gst_object_ref (pad);
407   else
408     priv->sinkpad = gst_object_ref (pad);
409
410   return stream;
411 }
412
413 /**
414  * gst_rtsp_stream_get_index:
415  * @stream: a #GstRTSPStream
416  *
417  * Get the stream index.
418  *
419  * Return: the stream index.
420  */
421 guint
422 gst_rtsp_stream_get_index (GstRTSPStream * stream)
423 {
424   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
425
426   return stream->priv->idx;
427 }
428
429 /**
430  * gst_rtsp_stream_get_pt:
431  * @stream: a #GstRTSPStream
432  *
433  * Get the stream payload type.
434  *
435  * Return: the stream payload type.
436  */
437 guint
438 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
439 {
440   GstRTSPStreamPrivate *priv;
441   guint pt;
442
443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
444
445   priv = stream->priv;
446
447   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
448
449   return pt;
450 }
451
452 /**
453  * gst_rtsp_stream_get_srcpad:
454  * @stream: a #GstRTSPStream
455  *
456  * Get the srcpad associated with @stream.
457  *
458  * Returns: (transfer full) (nullable): the srcpad. Unref after usage.
459  */
460 GstPad *
461 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
462 {
463   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
464
465   if (!stream->priv->srcpad)
466     return NULL;
467
468   return gst_object_ref (stream->priv->srcpad);
469 }
470
471 /**
472  * gst_rtsp_stream_get_sinkpad:
473  * @stream: a #GstRTSPStream
474  *
475  * Get the sinkpad associated with @stream.
476  *
477  * Returns: (transfer full) (nullable): the sinkpad. Unref after usage.
478  */
479 GstPad *
480 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
481 {
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
483
484   if (!stream->priv->sinkpad)
485     return NULL;
486
487   return gst_object_ref (stream->priv->sinkpad);
488 }
489
490 /**
491  * gst_rtsp_stream_get_control:
492  * @stream: a #GstRTSPStream
493  *
494  * Get the control string to identify this stream.
495  *
496  * Returns: (transfer full) (nullable): the control string. g_free() after usage.
497  */
498 gchar *
499 gst_rtsp_stream_get_control (GstRTSPStream * stream)
500 {
501   GstRTSPStreamPrivate *priv;
502   gchar *result;
503
504   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
505
506   priv = stream->priv;
507
508   g_mutex_lock (&priv->lock);
509   if ((result = g_strdup (priv->control)) == NULL)
510     result = g_strdup_printf ("stream=%u", priv->idx);
511   g_mutex_unlock (&priv->lock);
512
513   return result;
514 }
515
516 /**
517  * gst_rtsp_stream_set_control:
518  * @stream: a #GstRTSPStream
519  * @control: (nullable): a control string
520  *
521  * Set the control string in @stream.
522  */
523 void
524 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
525 {
526   GstRTSPStreamPrivate *priv;
527
528   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
529
530   priv = stream->priv;
531
532   g_mutex_lock (&priv->lock);
533   g_free (priv->control);
534   priv->control = g_strdup (control);
535   g_mutex_unlock (&priv->lock);
536 }
537
538 /**
539  * gst_rtsp_stream_has_control:
540  * @stream: a #GstRTSPStream
541  * @control: (nullable): a control string
542  *
543  * Check if @stream has the control string @control.
544  *
545  * Returns: %TRUE is @stream has @control as the control string
546  */
547 gboolean
548 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
549 {
550   GstRTSPStreamPrivate *priv;
551   gboolean res;
552
553   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
554
555   priv = stream->priv;
556
557   g_mutex_lock (&priv->lock);
558   if (priv->control)
559     res = (g_strcmp0 (priv->control, control) == 0);
560   else {
561     guint streamid;
562
563     if (sscanf (control, "stream=%u", &streamid) > 0)
564       res = (streamid == priv->idx);
565     else
566       res = FALSE;
567   }
568   g_mutex_unlock (&priv->lock);
569
570   return res;
571 }
572
573 /**
574  * gst_rtsp_stream_set_mtu:
575  * @stream: a #GstRTSPStream
576  * @mtu: a new MTU
577  *
578  * Configure the mtu in the payloader of @stream to @mtu.
579  */
580 void
581 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
582 {
583   GstRTSPStreamPrivate *priv;
584
585   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
586
587   priv = stream->priv;
588
589   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
590
591   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
592 }
593
594 /**
595  * gst_rtsp_stream_get_mtu:
596  * @stream: a #GstRTSPStream
597  *
598  * Get the configured MTU in the payloader of @stream.
599  *
600  * Returns: the MTU of the payloader.
601  */
602 guint
603 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
604 {
605   GstRTSPStreamPrivate *priv;
606   guint mtu;
607
608   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
609
610   priv = stream->priv;
611
612   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
613
614   return mtu;
615 }
616
617 /* Update the dscp qos property on the udp sinks */
618 static void
619 update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink)
620 {
621   GstRTSPStreamPrivate *priv;
622
623   priv = stream->priv;
624
625   if (*udpsink) {
626     g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL);
627   }
628 }
629
630 /**
631  * gst_rtsp_stream_set_dscp_qos:
632  * @stream: a #GstRTSPStream
633  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
634  *
635  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
636  */
637 void
638 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
639 {
640   GstRTSPStreamPrivate *priv;
641
642   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
643
644   priv = stream->priv;
645
646   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
647
648   if (dscp_qos < -1 || dscp_qos > 63) {
649     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
650     return;
651   }
652
653   priv->dscp_qos = dscp_qos;
654
655   update_dscp_qos (stream, priv->udpsink);
656 }
657
658 /**
659  * gst_rtsp_stream_get_dscp_qos:
660  * @stream: a #GstRTSPStream
661  *
662  * Get the configured DSCP QoS in of the outgoing sockets.
663  *
664  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
665  */
666 gint
667 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
668 {
669   GstRTSPStreamPrivate *priv;
670
671   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
672
673   priv = stream->priv;
674
675   return priv->dscp_qos;
676 }
677
678 /**
679  * gst_rtsp_stream_is_transport_supported:
680  * @stream: a #GstRTSPStream
681  * @transport: (transfer none): a #GstRTSPTransport
682  *
683  * Check if @transport can be handled by stream
684  *
685  * Returns: %TRUE if @transport can be handled by @stream.
686  */
687 gboolean
688 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
689     GstRTSPTransport * transport)
690 {
691   GstRTSPStreamPrivate *priv;
692
693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
694   g_return_val_if_fail (transport != NULL, FALSE);
695
696   priv = stream->priv;
697
698   g_mutex_lock (&priv->lock);
699   if (transport->trans != GST_RTSP_TRANS_RTP)
700     goto unsupported_transmode;
701
702   if (!(transport->profile & priv->profiles))
703     goto unsupported_profile;
704
705   if (!(transport->lower_transport & priv->protocols))
706     goto unsupported_ltrans;
707
708   g_mutex_unlock (&priv->lock);
709
710   return TRUE;
711
712   /* ERRORS */
713 unsupported_transmode:
714   {
715     GST_DEBUG ("unsupported transport mode %d", transport->trans);
716     g_mutex_unlock (&priv->lock);
717     return FALSE;
718   }
719 unsupported_profile:
720   {
721     GST_DEBUG ("unsupported profile %d", transport->profile);
722     g_mutex_unlock (&priv->lock);
723     return FALSE;
724   }
725 unsupported_ltrans:
726   {
727     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
728     g_mutex_unlock (&priv->lock);
729     return FALSE;
730   }
731 }
732
733 /**
734  * gst_rtsp_stream_set_profiles:
735  * @stream: a #GstRTSPStream
736  * @profiles: the new profiles
737  *
738  * Configure the allowed profiles for @stream.
739  */
740 void
741 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
742 {
743   GstRTSPStreamPrivate *priv;
744
745   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
746
747   priv = stream->priv;
748
749   g_mutex_lock (&priv->lock);
750   priv->profiles = profiles;
751   g_mutex_unlock (&priv->lock);
752 }
753
754 /**
755  * gst_rtsp_stream_get_profiles:
756  * @stream: a #GstRTSPStream
757  *
758  * Get the allowed profiles of @stream.
759  *
760  * Returns: a #GstRTSPProfile
761  */
762 GstRTSPProfile
763 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
764 {
765   GstRTSPStreamPrivate *priv;
766   GstRTSPProfile res;
767
768   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
769
770   priv = stream->priv;
771
772   g_mutex_lock (&priv->lock);
773   res = priv->profiles;
774   g_mutex_unlock (&priv->lock);
775
776   return res;
777 }
778
779 /**
780  * gst_rtsp_stream_set_protocols:
781  * @stream: a #GstRTSPStream
782  * @protocols: the new flags
783  *
784  * Configure the allowed lower transport for @stream.
785  */
786 void
787 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
788     GstRTSPLowerTrans protocols)
789 {
790   GstRTSPStreamPrivate *priv;
791
792   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
793
794   priv = stream->priv;
795
796   g_mutex_lock (&priv->lock);
797   priv->protocols = protocols;
798   g_mutex_unlock (&priv->lock);
799 }
800
801 /**
802  * gst_rtsp_stream_get_protocols:
803  * @stream: a #GstRTSPStream
804  *
805  * Get the allowed protocols of @stream.
806  *
807  * Returns: a #GstRTSPLowerTrans
808  */
809 GstRTSPLowerTrans
810 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
811 {
812   GstRTSPStreamPrivate *priv;
813   GstRTSPLowerTrans res;
814
815   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
816       GST_RTSP_LOWER_TRANS_UNKNOWN);
817
818   priv = stream->priv;
819
820   g_mutex_lock (&priv->lock);
821   res = priv->protocols;
822   g_mutex_unlock (&priv->lock);
823
824   return res;
825 }
826
827 /**
828  * gst_rtsp_stream_set_address_pool:
829  * @stream: a #GstRTSPStream
830  * @pool: (transfer none) (nullable): a #GstRTSPAddressPool
831  *
832  * configure @pool to be used as the address pool of @stream.
833  */
834 void
835 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
836     GstRTSPAddressPool * pool)
837 {
838   GstRTSPStreamPrivate *priv;
839   GstRTSPAddressPool *old;
840
841   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
842
843   priv = stream->priv;
844
845   GST_LOG_OBJECT (stream, "set address pool %p", pool);
846
847   g_mutex_lock (&priv->lock);
848   if ((old = priv->pool) != pool)
849     priv->pool = pool ? g_object_ref (pool) : NULL;
850   else
851     old = NULL;
852   g_mutex_unlock (&priv->lock);
853
854   if (old)
855     g_object_unref (old);
856 }
857
858 /**
859  * gst_rtsp_stream_get_address_pool:
860  * @stream: a #GstRTSPStream
861  *
862  * Get the #GstRTSPAddressPool used as the address pool of @stream.
863  *
864  * Returns: (transfer full) (nullable): the #GstRTSPAddressPool of @stream.
865  * g_object_unref() after usage.
866  */
867 GstRTSPAddressPool *
868 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
869 {
870   GstRTSPStreamPrivate *priv;
871   GstRTSPAddressPool *result;
872
873   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
874
875   priv = stream->priv;
876
877   g_mutex_lock (&priv->lock);
878   if ((result = priv->pool))
879     g_object_ref (result);
880   g_mutex_unlock (&priv->lock);
881
882   return result;
883 }
884
885 /**
886  * gst_rtsp_stream_set_multicast_iface:
887  * @stream: a #GstRTSPStream
888  * @multicast_iface: (transfer none) (nullable): a multicast interface name
889  *
890  * configure @multicast_iface to be used for @stream.
891  */
892 void
893 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
894     const gchar * multicast_iface)
895 {
896   GstRTSPStreamPrivate *priv;
897   gchar *old;
898
899   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
900
901   priv = stream->priv;
902
903   GST_LOG_OBJECT (stream, "set multicast iface %s",
904       GST_STR_NULL (multicast_iface));
905
906   g_mutex_lock (&priv->lock);
907   if ((old = priv->multicast_iface) != multicast_iface)
908     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
909   else
910     old = NULL;
911   g_mutex_unlock (&priv->lock);
912
913   if (old)
914     g_free (old);
915 }
916
917 /**
918  * gst_rtsp_stream_get_multicast_iface:
919  * @stream: a #GstRTSPStream
920  *
921  * Get the multicast interface used for @stream.
922  *
923  * Returns: (transfer full) (nullable): the multicast interface for @stream.
924  * g_free() after usage.
925  */
926 gchar *
927 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
928 {
929   GstRTSPStreamPrivate *priv;
930   gchar *result;
931
932   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
933
934   priv = stream->priv;
935
936   g_mutex_lock (&priv->lock);
937   if ((result = priv->multicast_iface))
938     result = g_strdup (result);
939   g_mutex_unlock (&priv->lock);
940
941   return result;
942 }
943
944 /**
945  * gst_rtsp_stream_get_multicast_address:
946  * @stream: a #GstRTSPStream
947  * @family: the #GSocketFamily
948  *
949  * Get the multicast address of @stream for @family. The original
950  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
951  * won't release the address from the pool.
952  *
953  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
954  * or %NULL when no address could be allocated. gst_rtsp_address_free()
955  * after usage.
956  */
957 GstRTSPAddress *
958 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
959     GSocketFamily family)
960 {
961   GstRTSPStreamPrivate *priv;
962   GstRTSPAddress *result;
963   GstRTSPAddress **addrp;
964   GstRTSPAddressFlags flags;
965
966   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
967
968   priv = stream->priv;
969
970   g_mutex_lock (&stream->priv->lock);
971
972   if (family == G_SOCKET_FAMILY_IPV6) {
973     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
974     addrp = &priv->mcast_addr_v6;
975   } else {
976     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
977     addrp = &priv->mcast_addr_v4;
978   }
979
980   if (*addrp == NULL) {
981     if (priv->pool == NULL)
982       goto no_pool;
983
984     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
985
986     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
987     if (*addrp == NULL)
988       goto no_address;
989
990     /* FIXME: Also reserve the same port with unicast ANY address, since that's
991      * where we are going to bind our socket. Probably loop until we find a port
992      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
993      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
994      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
995   }
996   result = gst_rtsp_address_copy (*addrp);
997
998   g_mutex_unlock (&stream->priv->lock);
999
1000   return result;
1001
1002   /* ERRORS */
1003 no_pool:
1004   {
1005     GST_ERROR_OBJECT (stream, "no address pool specified");
1006     g_mutex_unlock (&stream->priv->lock);
1007     return NULL;
1008   }
1009 no_address:
1010   {
1011     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1012     g_mutex_unlock (&stream->priv->lock);
1013     return NULL;
1014   }
1015 }
1016
1017 /**
1018  * gst_rtsp_stream_reserve_address:
1019  * @stream: a #GstRTSPStream
1020  * @address: an address
1021  * @port: a port
1022  * @n_ports: n_ports
1023  * @ttl: a TTL
1024  *
1025  * Reserve @address and @port as the address and port of @stream. The original
1026  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1027  * won't release the address from the pool.
1028  *
1029  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1030  * the address could be reserved. gst_rtsp_address_free() after usage.
1031  */
1032 GstRTSPAddress *
1033 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1034     const gchar * address, guint port, guint n_ports, guint ttl)
1035 {
1036   GstRTSPStreamPrivate *priv;
1037   GstRTSPAddress *result;
1038   GInetAddress *addr;
1039   GSocketFamily family;
1040   GstRTSPAddress **addrp;
1041
1042   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1043   g_return_val_if_fail (address != NULL, NULL);
1044   g_return_val_if_fail (port > 0, NULL);
1045   g_return_val_if_fail (n_ports > 0, NULL);
1046   g_return_val_if_fail (ttl > 0, NULL);
1047
1048   priv = stream->priv;
1049
1050   addr = g_inet_address_new_from_string (address);
1051   if (!addr) {
1052     GST_ERROR ("failed to get inet addr from %s", address);
1053     family = G_SOCKET_FAMILY_IPV4;
1054   } else {
1055     family = g_inet_address_get_family (addr);
1056     g_object_unref (addr);
1057   }
1058
1059   if (family == G_SOCKET_FAMILY_IPV6)
1060     addrp = &priv->mcast_addr_v6;
1061   else
1062     addrp = &priv->mcast_addr_v4;
1063
1064   g_mutex_lock (&priv->lock);
1065   if (*addrp == NULL) {
1066     GstRTSPAddressPoolResult res;
1067
1068     if (priv->pool == NULL)
1069       goto no_pool;
1070
1071     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1072         port, n_ports, ttl, addrp);
1073     if (res != GST_RTSP_ADDRESS_POOL_OK)
1074       goto no_address;
1075
1076     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1077      * where we are going to bind our socket. */
1078   } else {
1079     if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1080         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1081         (*addrp)->ttl != ttl)
1082       goto different_address;
1083   }
1084   result = gst_rtsp_address_copy (*addrp);
1085   g_mutex_unlock (&priv->lock);
1086
1087   return result;
1088
1089   /* ERRORS */
1090 no_pool:
1091   {
1092     GST_ERROR_OBJECT (stream, "no address pool specified");
1093     g_mutex_unlock (&priv->lock);
1094     return NULL;
1095   }
1096 no_address:
1097   {
1098     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1099         address);
1100     g_mutex_unlock (&priv->lock);
1101     return NULL;
1102   }
1103 different_address:
1104   {
1105     GST_ERROR_OBJECT (stream,
1106         "address %s is not the same as %s that was already reserved",
1107         address, (*addrp)->address);
1108     g_mutex_unlock (&priv->lock);
1109     return NULL;
1110   }
1111 }
1112
1113 /* must be called with lock */
1114 static void
1115 set_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1116     GSocketFamily family)
1117 {
1118   const gchar *multisink_socket;
1119
1120   if (family == G_SOCKET_FAMILY_IPV6)
1121     multisink_socket = "socket-v6";
1122   else
1123     multisink_socket = "socket";
1124
1125   g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL);
1126 }
1127
1128 /* must be called with lock */
1129 static void
1130 set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1131     GSocketFamily family, const gchar * multicast_iface,
1132     const gchar * addr_str, gint port, gint mcast_ttl)
1133 {
1134   set_socket_for_udpsink (udpsink, socket, family);
1135
1136   if (multicast_iface) {
1137     GST_INFO ("setting multicast-iface %s", multicast_iface);
1138     g_object_set (G_OBJECT (udpsink), "multicast-iface", multicast_iface, NULL);
1139   }
1140
1141   if (mcast_ttl > 0) {
1142     GST_INFO ("setting ttl-mc %d", mcast_ttl);
1143     g_object_set (G_OBJECT (udpsink), "ttl-mc", mcast_ttl, NULL);
1144   }
1145
1146   g_signal_emit_by_name (udpsink, "add", addr_str, port, NULL);
1147 }
1148
1149
1150 /* must be called with lock */
1151 static void
1152 set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1153     GSocketFamily family)
1154 {
1155   set_socket_for_udpsink (udpsink, socket, family);
1156 }
1157
1158 static guint16
1159 get_port_from_socket (GSocket * socket)
1160 {
1161   guint16 port;
1162   GSocketAddress *sockaddr;
1163   GError *err;
1164
1165   GST_DEBUG ("socket: %p", socket);
1166   sockaddr = g_socket_get_local_address (socket, &err);
1167   if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) {
1168     g_clear_object (&sockaddr);
1169     GST_ERROR ("failed to get sockaddr: %s", err->message);
1170     g_error_free (err);
1171     return 0;
1172   }
1173
1174   port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr));
1175   g_object_unref (sockaddr);
1176
1177   return port;
1178 }
1179
1180
1181 static gboolean
1182 create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink,
1183     GSocket * socket_v4, GSocket * socket_v6, gboolean multicast,
1184     gboolean is_rtp, gint mcast_ttl)
1185 {
1186   GstRTSPStreamPrivate *priv = stream->priv;
1187
1188   *udpsink = gst_element_factory_make ("multiudpsink", NULL);
1189
1190   if (!*udpsink)
1191     goto no_udp_protocol;
1192
1193   /* configure sinks */
1194
1195   g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL);
1196
1197   g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL);
1198
1199   if (is_rtp)
1200     g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL);
1201   else
1202     g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL);
1203
1204   /* Needs to be async for RECORD streams, otherwise we will never go to
1205    * PLAYING because the sinks will wait for data while the udpsrc can't
1206    * provide data with timestamps in PAUSED. */
1207   if (!is_rtp || priv->sinkpad)
1208     g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL);
1209
1210   if (multicast) {
1211     /* join multicast group when adding clients, so we'll start receiving from it.
1212      * We cannot rely on the udpsrc to join the group since its socket is always a
1213      * local unicast one. */
1214     g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL);
1215
1216     g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL);
1217   }
1218
1219   /* update the dscp qos field in the sinks */
1220   update_dscp_qos (stream, udpsink);
1221
1222   if (priv->server_addr_v4) {
1223     GST_DEBUG_OBJECT (stream, "udp IPv4, configure udpsinks");
1224     set_unicast_socket_for_udpsink (*udpsink, socket_v4, G_SOCKET_FAMILY_IPV4);
1225   }
1226
1227   if (priv->server_addr_v6) {
1228     GST_DEBUG_OBJECT (stream, "udp IPv6, configure udpsinks");
1229     set_unicast_socket_for_udpsink (*udpsink, socket_v6, G_SOCKET_FAMILY_IPV6);
1230   }
1231
1232   if (multicast) {
1233     gint port;
1234     if (priv->mcast_addr_v4) {
1235       GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks");
1236       port = get_port_from_socket (socket_v4);
1237       if (!port)
1238         goto get_port_failed;
1239       set_multicast_socket_for_udpsink (*udpsink, socket_v4,
1240           G_SOCKET_FAMILY_IPV4, priv->multicast_iface,
1241           priv->mcast_addr_v4->address, port, mcast_ttl);
1242     }
1243
1244     if (priv->mcast_addr_v6) {
1245       GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks");
1246       port = get_port_from_socket (socket_v6);
1247       if (!port)
1248         goto get_port_failed;
1249       set_multicast_socket_for_udpsink (*udpsink, socket_v6,
1250           G_SOCKET_FAMILY_IPV6, priv->multicast_iface,
1251           priv->mcast_addr_v6->address, port, mcast_ttl);
1252     }
1253
1254   }
1255
1256   return TRUE;
1257
1258   /* ERRORS */
1259 no_udp_protocol:
1260   {
1261     GST_ERROR_OBJECT (stream, "failed to create udpsink element");
1262     return FALSE;
1263   }
1264 get_port_failed:
1265   {
1266     GST_ERROR_OBJECT (stream, "failed to get udp port");
1267     return FALSE;
1268   }
1269 }
1270
1271 /* must be called with lock */
1272 static gboolean
1273 create_and_configure_udpsource (GstElement ** udpsrc, GSocket * socket)
1274 {
1275   GstStateChangeReturn ret;
1276
1277   g_assert (socket != NULL);
1278
1279   *udpsrc = gst_element_factory_make ("udpsrc", NULL);
1280   if (*udpsrc == NULL)
1281     goto error;
1282
1283   g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL);
1284
1285   /* The udpsrc cannot do the join because its socket is always a local unicast
1286    * one. The udpsink sharing the same socket will do it for us. */
1287   g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL);
1288
1289   g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL);
1290
1291   g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL);
1292
1293   ret = gst_element_set_state (*udpsrc, GST_STATE_READY);
1294   if (ret == GST_STATE_CHANGE_FAILURE)
1295     goto error;
1296
1297   return TRUE;
1298
1299   /* ERRORS */
1300 error:
1301   {
1302     if (*udpsrc) {
1303       gst_element_set_state (*udpsrc, GST_STATE_NULL);
1304       g_clear_object (udpsrc);
1305     }
1306     return FALSE;
1307   }
1308 }
1309
1310 static gboolean
1311 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1312     GSocket * socket_out[2], GstRTSPAddress ** server_addr_out,
1313     gboolean multicast, GstRTSPTransport * ct)
1314 {
1315   GstRTSPStreamPrivate *priv = stream->priv;
1316   GSocket *rtp_socket = NULL;
1317   GSocket *rtcp_socket;
1318   gint tmp_rtp, tmp_rtcp;
1319   guint count;
1320   GList *rejected_addresses = NULL;
1321   GstRTSPAddress *addr = NULL;
1322   GInetAddress *inetaddr = NULL;
1323   GSocketAddress *rtp_sockaddr = NULL;
1324   GSocketAddress *rtcp_sockaddr = NULL;
1325   GstRTSPAddressPool *pool;
1326
1327   pool = priv->pool;
1328   count = 0;
1329
1330   /* Start with random port */
1331   tmp_rtp = 0;
1332
1333   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1334       G_SOCKET_PROTOCOL_UDP, NULL);
1335   if (!rtcp_socket)
1336     goto no_udp_protocol;
1337   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1338
1339   /* try to allocate 2 UDP ports, the RTP port should be an even
1340    * number and the RTCP port should be the next (uneven) port */
1341 again:
1342
1343   if (rtp_socket == NULL) {
1344     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1345         G_SOCKET_PROTOCOL_UDP, NULL);
1346     if (!rtp_socket)
1347       goto no_udp_protocol;
1348     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1349   }
1350
1351   if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
1352     GstRTSPAddressFlags flags;
1353
1354     if (addr)
1355       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1356
1357     if (!pool)
1358       goto no_pool;
1359
1360     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1361     if (multicast)
1362       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1363     else
1364       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1365
1366     if (family == G_SOCKET_FAMILY_IPV6)
1367       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1368     else
1369       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1370
1371     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1372
1373     if (addr == NULL)
1374       goto no_address;
1375
1376     tmp_rtp = addr->port;
1377
1378     g_clear_object (&inetaddr);
1379     /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and
1380      * socket control message set in udpsrc? */
1381     if (multicast)
1382       inetaddr = g_inet_address_new_any (family);
1383     else
1384       inetaddr = g_inet_address_new_from_string (addr->address);
1385   } else {
1386     if (tmp_rtp != 0) {
1387       tmp_rtp += 2;
1388       if (++count > 20)
1389         goto no_ports;
1390     }
1391
1392     if (inetaddr == NULL)
1393       inetaddr = g_inet_address_new_any (family);
1394   }
1395
1396   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1397   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1398     GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again");
1399     g_object_unref (rtp_sockaddr);
1400     goto again;
1401   }
1402   g_object_unref (rtp_sockaddr);
1403
1404   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1405   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1406     g_clear_object (&rtp_sockaddr);
1407     goto socket_error;
1408   }
1409
1410   tmp_rtp =
1411       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1412   g_object_unref (rtp_sockaddr);
1413
1414   /* check if port is even */
1415   if ((tmp_rtp & 1) != 0) {
1416     /* port not even, close and allocate another */
1417     tmp_rtp++;
1418     g_clear_object (&rtp_socket);
1419     goto again;
1420   }
1421
1422   /* set port */
1423   tmp_rtcp = tmp_rtp + 1;
1424
1425   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1426   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1427     GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again");
1428     g_object_unref (rtcp_sockaddr);
1429     g_clear_object (&rtp_socket);
1430     goto again;
1431   }
1432   g_object_unref (rtcp_sockaddr);
1433
1434   if (!addr) {
1435     addr = g_slice_new0 (GstRTSPAddress);
1436     addr->address = g_inet_address_to_string (inetaddr);
1437     addr->port = tmp_rtp;
1438     addr->n_ports = 2;
1439   }
1440
1441   g_clear_object (&inetaddr);
1442
1443   socket_out[0] = rtp_socket;
1444   socket_out[1] = rtcp_socket;
1445   *server_addr_out = addr;
1446
1447   GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d",
1448       addr->address, tmp_rtp, tmp_rtcp);
1449
1450   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1451
1452   return TRUE;
1453
1454   /* ERRORS */
1455 no_udp_protocol:
1456   {
1457     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: protocol error");
1458     goto cleanup;
1459   }
1460 no_pool:
1461   {
1462     GST_ERROR_OBJECT (stream,
1463         "failed to allocate UDP ports: no address pool specified");
1464     goto cleanup;
1465   }
1466 no_address:
1467   {
1468     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1469     goto cleanup;
1470   }
1471 no_ports:
1472   {
1473     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no ports");
1474     goto cleanup;
1475   }
1476 socket_error:
1477   {
1478     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: socket error");
1479     goto cleanup;
1480   }
1481 cleanup:
1482   {
1483     if (inetaddr)
1484       g_object_unref (inetaddr);
1485     g_list_free_full (rejected_addresses,
1486         (GDestroyNotify) gst_rtsp_address_free);
1487     if (addr)
1488       gst_rtsp_address_free (addr);
1489     if (rtp_socket)
1490       g_object_unref (rtp_socket);
1491     if (rtcp_socket)
1492       g_object_unref (rtcp_socket);
1493     return FALSE;
1494   }
1495 }
1496
1497 /**
1498  * gst_rtsp_stream_allocate_udp_sockets:
1499  * @stream: a #GstRTSPStream
1500  * @family: protocol family
1501  * @transport: transport method
1502  * @use_client_settings: Whether to use client settings or not
1503  *
1504  * Allocates RTP and RTCP ports.
1505  *
1506  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1507  */
1508 gboolean
1509 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1510     GSocketFamily family, GstRTSPTransport * ct,
1511     gboolean use_transport_settings)
1512 {
1513   GstRTSPStreamPrivate *priv;
1514   gboolean ret = FALSE;
1515   GstRTSPLowerTrans transport;
1516   gboolean allocated = FALSE;
1517
1518   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1519   g_return_val_if_fail (ct != NULL, FALSE);
1520   priv = stream->priv;
1521
1522   transport = ct->lower_transport;
1523
1524   g_mutex_lock (&priv->lock);
1525
1526   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1527     if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_socket_v4[0])
1528       allocated = TRUE;
1529     else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_socket_v6[0])
1530       allocated = TRUE;
1531   } else if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1532     if (family == G_SOCKET_FAMILY_IPV4 && priv->socket_v4[0])
1533       allocated = TRUE;
1534     else if (family == G_SOCKET_FAMILY_IPV6 && priv->socket_v6[0])
1535       allocated = TRUE;
1536   }
1537
1538   if (allocated) {
1539     GST_DEBUG_OBJECT (stream, "Allocated already");
1540     g_mutex_unlock (&priv->lock);
1541     return TRUE;
1542   }
1543
1544   if (family == G_SOCKET_FAMILY_IPV4) {
1545     /* IPv4 */
1546     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1547       /* UDP unicast */
1548       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4");
1549       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1550           priv->socket_v4, &priv->server_addr_v4, FALSE, ct);
1551     } else {
1552       /* multicast */
1553       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4");
1554       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1555           priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct);
1556     }
1557   } else {
1558     /* IPv6 */
1559     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1560       /* unicast */
1561       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6");
1562       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1563           priv->socket_v6, &priv->server_addr_v6, FALSE, ct);
1564
1565     } else {
1566       /* multicast */
1567       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6");
1568       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1569           priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct);
1570     }
1571   }
1572   g_mutex_unlock (&priv->lock);
1573
1574   return ret;
1575 }
1576
1577 /**
1578  * gst_rtsp_stream_set_client_side:
1579  * @stream: a #GstRTSPStream
1580  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1581  * an RTSP connection.
1582  *
1583  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1584  * streams to an RTSP server via RECORD. This has the practical effect
1585  * of changing which UDP port numbers are used when setting up the local
1586  * side of the stream sending to be either the 'server' or 'client' pair
1587  * of a configured UDP transport.
1588  */
1589 void
1590 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1591 {
1592   GstRTSPStreamPrivate *priv;
1593
1594   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1595   priv = stream->priv;
1596   g_mutex_lock (&priv->lock);
1597   priv->client_side = client_side;
1598   g_mutex_unlock (&priv->lock);
1599 }
1600
1601 /**
1602  * gst_rtsp_stream_is_client_side:
1603  * @stream: a #GstRTSPStream
1604  *
1605  * See gst_rtsp_stream_set_client_side()
1606  *
1607  * Returns: TRUE if this #GstRTSPStream is client-side.
1608  */
1609 gboolean
1610 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1611 {
1612   GstRTSPStreamPrivate *priv;
1613   gboolean ret;
1614
1615   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1616
1617   priv = stream->priv;
1618   g_mutex_lock (&priv->lock);
1619   ret = priv->client_side;
1620   g_mutex_unlock (&priv->lock);
1621
1622   return ret;
1623 }
1624
1625 /**
1626  * gst_rtsp_stream_get_server_port:
1627  * @stream: a #GstRTSPStream
1628  * @server_port: (out): result server port
1629  * @family: the port family to get
1630  *
1631  * Fill @server_port with the port pair used by the server. This function can
1632  * only be called when @stream has been joined.
1633  */
1634 void
1635 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1636     GstRTSPRange * server_port, GSocketFamily family)
1637 {
1638   GstRTSPStreamPrivate *priv;
1639
1640   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1641   priv = stream->priv;
1642   g_return_if_fail (priv->joined_bin != NULL);
1643
1644   if (server_port) {
1645     server_port->min = 0;
1646     server_port->max = 0;
1647   }
1648
1649   g_mutex_lock (&priv->lock);
1650   if (family == G_SOCKET_FAMILY_IPV4) {
1651     if (server_port && priv->server_addr_v4) {
1652       server_port->min = priv->server_addr_v4->port;
1653       server_port->max =
1654           priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1655     }
1656   } else {
1657     if (server_port && priv->server_addr_v6) {
1658       server_port->min = priv->server_addr_v6->port;
1659       server_port->max =
1660           priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1661     }
1662   }
1663   g_mutex_unlock (&priv->lock);
1664 }
1665
1666 /**
1667  * gst_rtsp_stream_get_rtpsession:
1668  * @stream: a #GstRTSPStream
1669  *
1670  * Get the RTP session of this stream.
1671  *
1672  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1673  */
1674 GObject *
1675 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1676 {
1677   GstRTSPStreamPrivate *priv;
1678   GObject *session;
1679
1680   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1681
1682   priv = stream->priv;
1683
1684   g_mutex_lock (&priv->lock);
1685   if ((session = priv->session))
1686     g_object_ref (session);
1687   g_mutex_unlock (&priv->lock);
1688
1689   return session;
1690 }
1691
1692 /**
1693  * gst_rtsp_stream_get_srtp_encoder:
1694  * @stream: a #GstRTSPStream
1695  *
1696  * Get the SRTP encoder for this stream.
1697  *
1698  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1699  */
1700 GstElement *
1701 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1702 {
1703   GstRTSPStreamPrivate *priv;
1704   GstElement *encoder;
1705
1706   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1707
1708   priv = stream->priv;
1709
1710   g_mutex_lock (&priv->lock);
1711   if ((encoder = priv->srtpenc))
1712     g_object_ref (encoder);
1713   g_mutex_unlock (&priv->lock);
1714
1715   return encoder;
1716 }
1717
1718 /**
1719  * gst_rtsp_stream_get_ssrc:
1720  * @stream: a #GstRTSPStream
1721  * @ssrc: (out): result ssrc
1722  *
1723  * Get the SSRC used by the RTP session of this stream. This function can only
1724  * be called when @stream has been joined.
1725  */
1726 void
1727 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1728 {
1729   GstRTSPStreamPrivate *priv;
1730
1731   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1732   priv = stream->priv;
1733   g_return_if_fail (priv->joined_bin != NULL);
1734
1735   g_mutex_lock (&priv->lock);
1736   if (ssrc && priv->session)
1737     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1738   g_mutex_unlock (&priv->lock);
1739 }
1740
1741 /**
1742  * gst_rtsp_stream_set_retransmission_time:
1743  * @stream: a #GstRTSPStream
1744  * @time: a #GstClockTime
1745  *
1746  * Set the amount of time to store retransmission packets.
1747  */
1748 void
1749 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1750     GstClockTime time)
1751 {
1752   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1753
1754   g_mutex_lock (&stream->priv->lock);
1755   stream->priv->rtx_time = time;
1756   if (stream->priv->rtxsend)
1757     g_object_set (stream->priv->rtxsend, "max-size-time",
1758         GST_TIME_AS_MSECONDS (time), NULL);
1759   g_mutex_unlock (&stream->priv->lock);
1760 }
1761
1762 /**
1763  * gst_rtsp_stream_get_retransmission_time:
1764  * @stream: a #GstRTSPStream
1765  *
1766  * Get the amount of time to store retransmission data.
1767  *
1768  * Returns: the amount of time to store retransmission data.
1769  */
1770 GstClockTime
1771 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1772 {
1773   GstClockTime ret;
1774
1775   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1776
1777   g_mutex_lock (&stream->priv->lock);
1778   ret = stream->priv->rtx_time;
1779   g_mutex_unlock (&stream->priv->lock);
1780
1781   return ret;
1782 }
1783
1784 /**
1785  * gst_rtsp_stream_set_retransmission_pt:
1786  * @stream: a #GstRTSPStream
1787  * @rtx_pt: a #guint
1788  *
1789  * Set the payload type (pt) for retransmission of this stream.
1790  */
1791 void
1792 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1793 {
1794   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1795
1796   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1797
1798   g_mutex_lock (&stream->priv->lock);
1799   stream->priv->rtx_pt = rtx_pt;
1800   if (stream->priv->rtxsend) {
1801     guint pt = gst_rtsp_stream_get_pt (stream);
1802     gchar *pt_s = g_strdup_printf ("%d", pt);
1803     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1804         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1805     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1806     g_free (pt_s);
1807     gst_structure_free (rtx_pt_map);
1808   }
1809   g_mutex_unlock (&stream->priv->lock);
1810 }
1811
1812 /**
1813  * gst_rtsp_stream_get_retransmission_pt:
1814  * @stream: a #GstRTSPStream
1815  *
1816  * Get the payload-type used for retransmission of this stream
1817  *
1818  * Returns: The retransmission PT.
1819  */
1820 guint
1821 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1822 {
1823   guint rtx_pt;
1824
1825   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1826
1827   g_mutex_lock (&stream->priv->lock);
1828   rtx_pt = stream->priv->rtx_pt;
1829   g_mutex_unlock (&stream->priv->lock);
1830
1831   return rtx_pt;
1832 }
1833
1834 /**
1835  * gst_rtsp_stream_set_buffer_size:
1836  * @stream: a #GstRTSPStream
1837  * @size: the buffer size
1838  *
1839  * Set the size of the UDP transmission buffer (in bytes)
1840  * Needs to be set before the stream is joined to a bin.
1841  *
1842  * Since: 1.6
1843  */
1844 void
1845 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1846 {
1847   g_mutex_lock (&stream->priv->lock);
1848   stream->priv->buffer_size = size;
1849   g_mutex_unlock (&stream->priv->lock);
1850 }
1851
1852 /**
1853  * gst_rtsp_stream_get_buffer_size:
1854  * @stream: a #GstRTSPStream
1855  *
1856  * Get the size of the UDP transmission buffer (in bytes)
1857  *
1858  * Returns: the size of the UDP TX buffer
1859  *
1860  * Since: 1.6
1861  */
1862 guint
1863 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1864 {
1865   guint buffer_size;
1866
1867   g_mutex_lock (&stream->priv->lock);
1868   buffer_size = stream->priv->buffer_size;
1869   g_mutex_unlock (&stream->priv->lock);
1870
1871   return buffer_size;
1872 }
1873
1874 /* executed from streaming thread */
1875 static void
1876 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1877 {
1878   GstRTSPStreamPrivate *priv = stream->priv;
1879   GstCaps *newcaps, *oldcaps;
1880
1881   newcaps = gst_pad_get_current_caps (pad);
1882
1883   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1884       newcaps);
1885
1886   g_mutex_lock (&priv->lock);
1887   oldcaps = priv->caps;
1888   priv->caps = newcaps;
1889   g_mutex_unlock (&priv->lock);
1890
1891   if (oldcaps)
1892     gst_caps_unref (oldcaps);
1893 }
1894
1895 static void
1896 dump_structure (const GstStructure * s)
1897 {
1898   gchar *sstr;
1899
1900   sstr = gst_structure_to_string (s);
1901   GST_INFO ("structure: %s", sstr);
1902   g_free (sstr);
1903 }
1904
1905 static GstRTSPStreamTransport *
1906 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1907 {
1908   GstRTSPStreamPrivate *priv = stream->priv;
1909   GList *walk;
1910   GstRTSPStreamTransport *result = NULL;
1911   const gchar *tmp;
1912   gchar *dest;
1913   guint port;
1914
1915   if (rtcp_from == NULL)
1916     return NULL;
1917
1918   tmp = g_strrstr (rtcp_from, ":");
1919   if (tmp == NULL)
1920     return NULL;
1921
1922   port = atoi (tmp + 1);
1923   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1924
1925   g_mutex_lock (&priv->lock);
1926   GST_INFO ("finding %s:%d in %d transports", dest, port,
1927       g_list_length (priv->transports));
1928
1929   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1930     GstRTSPStreamTransport *trans = walk->data;
1931     const GstRTSPTransport *tr;
1932     gint min, max;
1933
1934     tr = gst_rtsp_stream_transport_get_transport (trans);
1935
1936     if (priv->client_side) {
1937       /* In client side mode the 'destination' is the RTSP server, so send
1938        * to those ports */
1939       min = tr->server_port.min;
1940       max = tr->server_port.max;
1941     } else {
1942       min = tr->client_port.min;
1943       max = tr->client_port.max;
1944     }
1945
1946     if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
1947         (min == port || max == port)) {
1948       result = trans;
1949       break;
1950     }
1951   }
1952   if (result)
1953     g_object_ref (result);
1954   g_mutex_unlock (&priv->lock);
1955
1956   g_free (dest);
1957
1958   return result;
1959 }
1960
1961 static GstRTSPStreamTransport *
1962 check_transport (GObject * source, GstRTSPStream * stream)
1963 {
1964   GstStructure *stats;
1965   GstRTSPStreamTransport *trans;
1966
1967   /* see if we have a stream to match with the origin of the RTCP packet */
1968   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1969   if (trans == NULL) {
1970     g_object_get (source, "stats", &stats, NULL);
1971     if (stats) {
1972       const gchar *rtcp_from;
1973
1974       dump_structure (stats);
1975
1976       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1977       if ((trans = find_transport (stream, rtcp_from))) {
1978         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1979             source);
1980         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1981             g_object_unref);
1982       }
1983       gst_structure_free (stats);
1984     }
1985   }
1986   return trans;
1987 }
1988
1989
1990 static void
1991 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1992 {
1993   GstRTSPStreamTransport *trans;
1994
1995   GST_INFO ("%p: new source %p", stream, source);
1996
1997   trans = check_transport (source, stream);
1998
1999   if (trans)
2000     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
2001 }
2002
2003 static void
2004 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
2005 {
2006   GST_INFO ("%p: new SDES %p", stream, source);
2007 }
2008
2009 static void
2010 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
2011 {
2012   GstRTSPStreamTransport *trans;
2013
2014   trans = check_transport (source, stream);
2015
2016   if (trans) {
2017     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
2018     gst_rtsp_stream_transport_keep_alive (trans);
2019   }
2020 #ifdef DUMP_STATS
2021   {
2022     GstStructure *stats;
2023     g_object_get (source, "stats", &stats, NULL);
2024     if (stats) {
2025       dump_structure (stats);
2026       gst_structure_free (stats);
2027     }
2028   }
2029 #endif
2030 }
2031
2032 static void
2033 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2034 {
2035   GST_INFO ("%p: source %p bye", stream, source);
2036 }
2037
2038 static void
2039 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2040 {
2041   GstRTSPStreamTransport *trans;
2042
2043   GST_INFO ("%p: source %p bye timeout", stream, source);
2044
2045   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2046     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2047     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2048   }
2049 }
2050
2051 static void
2052 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2053 {
2054   GstRTSPStreamTransport *trans;
2055
2056   GST_INFO ("%p: source %p timeout", stream, source);
2057
2058   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2059     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2060     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2061   }
2062 }
2063
2064 static void
2065 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2066 {
2067   GST_INFO ("%p: new sender source %p", stream, source);
2068 #ifndef DUMP_STATS
2069   {
2070     GstStructure *stats;
2071     g_object_get (source, "stats", &stats, NULL);
2072     if (stats) {
2073       dump_structure (stats);
2074       gst_structure_free (stats);
2075     }
2076   }
2077 #endif
2078 }
2079
2080 static void
2081 on_sender_ssrc_active (GObject * session, GObject * source,
2082     GstRTSPStream * stream)
2083 {
2084 #ifndef DUMP_STATS
2085   {
2086     GstStructure *stats;
2087     g_object_get (source, "stats", &stats, NULL);
2088     if (stats) {
2089       dump_structure (stats);
2090       gst_structure_free (stats);
2091     }
2092   }
2093 #endif
2094 }
2095
2096 static void
2097 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2098 {
2099   if (is_rtp) {
2100     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2101     g_list_free (priv->tr_cache_rtp);
2102     priv->tr_cache_rtp = NULL;
2103   } else {
2104     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2105     g_list_free (priv->tr_cache_rtcp);
2106     priv->tr_cache_rtcp = NULL;
2107   }
2108 }
2109
2110 static GstFlowReturn
2111 handle_new_sample (GstAppSink * sink, gpointer user_data)
2112 {
2113   GstRTSPStreamPrivate *priv;
2114   GList *walk;
2115   GstSample *sample;
2116   GstBuffer *buffer;
2117   GstRTSPStream *stream;
2118   gboolean is_rtp;
2119
2120   sample = gst_app_sink_pull_sample (sink);
2121   if (!sample)
2122     return GST_FLOW_OK;
2123
2124   stream = (GstRTSPStream *) user_data;
2125   priv = stream->priv;
2126   buffer = gst_sample_get_buffer (sample);
2127
2128   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2129
2130   g_mutex_lock (&priv->lock);
2131   if (is_rtp) {
2132     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2133       clear_tr_cache (priv, is_rtp);
2134       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2135         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2136         priv->tr_cache_rtp =
2137             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2138       }
2139       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2140     }
2141   } else {
2142     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2143       clear_tr_cache (priv, is_rtp);
2144       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2145         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2146         priv->tr_cache_rtcp =
2147             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2148       }
2149       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2150     }
2151   }
2152   g_mutex_unlock (&priv->lock);
2153
2154   if (is_rtp) {
2155     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2156       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2157       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2158     }
2159   } else {
2160     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2161       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2162       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2163     }
2164   }
2165   gst_sample_unref (sample);
2166
2167   return GST_FLOW_OK;
2168 }
2169
2170 static GstAppSinkCallbacks sink_cb = {
2171   NULL,                         /* not interested in EOS */
2172   NULL,                         /* not interested in preroll samples */
2173   handle_new_sample,
2174 };
2175
2176 static GstElement *
2177 get_rtp_encoder (GstRTSPStream * stream, guint session)
2178 {
2179   GstRTSPStreamPrivate *priv = stream->priv;
2180
2181   if (priv->srtpenc == NULL) {
2182     gchar *name;
2183
2184     name = g_strdup_printf ("srtpenc_%u", session);
2185     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2186     g_free (name);
2187
2188     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2189   }
2190   return gst_object_ref (priv->srtpenc);
2191 }
2192
2193 static GstElement *
2194 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2195 {
2196   GstRTSPStreamPrivate *priv = stream->priv;
2197   GstElement *oldenc, *enc;
2198   GstPad *pad;
2199   gchar *name;
2200
2201   if (priv->idx != session)
2202     return NULL;
2203
2204   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2205
2206   oldenc = priv->srtpenc;
2207   enc = get_rtp_encoder (stream, session);
2208   name = g_strdup_printf ("rtp_sink_%d", session);
2209   pad = gst_element_get_request_pad (enc, name);
2210   g_free (name);
2211   gst_object_unref (pad);
2212
2213   if (oldenc == NULL)
2214     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2215         enc);
2216
2217   return enc;
2218 }
2219
2220 static GstElement *
2221 request_rtcp_encoder (GstElement * rtpbin, guint session,
2222     GstRTSPStream * stream)
2223 {
2224   GstRTSPStreamPrivate *priv = stream->priv;
2225   GstElement *oldenc, *enc;
2226   GstPad *pad;
2227   gchar *name;
2228
2229   if (priv->idx != session)
2230     return NULL;
2231
2232   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2233
2234   oldenc = priv->srtpenc;
2235   enc = get_rtp_encoder (stream, session);
2236   name = g_strdup_printf ("rtcp_sink_%d", session);
2237   pad = gst_element_get_request_pad (enc, name);
2238   g_free (name);
2239   gst_object_unref (pad);
2240
2241   if (oldenc == NULL)
2242     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2243         enc);
2244
2245   return enc;
2246 }
2247
2248 static GstCaps *
2249 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2250 {
2251   GstRTSPStreamPrivate *priv = stream->priv;
2252   GstCaps *caps;
2253
2254   GST_DEBUG ("request key %08x", ssrc);
2255
2256   g_mutex_lock (&priv->lock);
2257   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2258     gst_caps_ref (caps);
2259   g_mutex_unlock (&priv->lock);
2260
2261   return caps;
2262 }
2263
2264 static GstElement *
2265 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2266     GstRTSPStream * stream)
2267 {
2268   GstRTSPStreamPrivate *priv = stream->priv;
2269
2270   if (priv->idx != session)
2271     return NULL;
2272
2273   if (priv->srtpdec == NULL) {
2274     gchar *name;
2275
2276     name = g_strdup_printf ("srtpdec_%u", session);
2277     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2278     g_free (name);
2279
2280     g_signal_connect (priv->srtpdec, "request-key",
2281         (GCallback) request_key, stream);
2282   }
2283   return gst_object_ref (priv->srtpdec);
2284 }
2285
2286 /**
2287  * gst_rtsp_stream_request_aux_sender:
2288  * @stream: a #GstRTSPStream
2289  * @sessid: the session id
2290  *
2291  * Creating a rtxsend bin
2292  *
2293  * Returns: (transfer full) (nullable): a #GstElement.
2294  *
2295  * Since: 1.6
2296  */
2297 GstElement *
2298 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2299 {
2300   GstElement *bin;
2301   GstPad *pad;
2302   GstStructure *pt_map;
2303   gchar *name;
2304   guint pt, rtx_pt;
2305   gchar *pt_s;
2306
2307   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2308
2309   pt = gst_rtsp_stream_get_pt (stream);
2310   pt_s = g_strdup_printf ("%u", pt);
2311   rtx_pt = stream->priv->rtx_pt;
2312
2313   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2314
2315   bin = gst_bin_new (NULL);
2316   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2317   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2318       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2319   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2320       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2321   g_free (pt_s);
2322   gst_structure_free (pt_map);
2323   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2324
2325   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2326   name = g_strdup_printf ("src_%u", sessid);
2327   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2328   g_free (name);
2329   gst_object_unref (pad);
2330
2331   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2332   name = g_strdup_printf ("sink_%u", sessid);
2333   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2334   g_free (name);
2335   gst_object_unref (pad);
2336
2337   return bin;
2338 }
2339
2340 static void
2341 add_rtx_pt (gpointer key, GstCaps * caps, GstStructure * pt_map)
2342 {
2343   guint pt = GPOINTER_TO_INT (key);
2344   const GstStructure *s = gst_caps_get_structure (caps, 0);
2345   const gchar *apt;
2346
2347   if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "RTX") &&
2348       (apt = gst_structure_get_string (s, "apt"))) {
2349     gst_structure_set (pt_map, apt, G_TYPE_UINT, pt, NULL);
2350   }
2351 }
2352
2353 /* Call with priv->lock taken */
2354 static void
2355 update_rtx_receive_pt_map (GstRTSPStream * stream)
2356 {
2357   GstStructure *pt_map;
2358
2359   if (!stream->priv->rtxreceive)
2360     goto done;
2361
2362   pt_map = gst_structure_new_empty ("application/x-rtp-pt-map");
2363   g_hash_table_foreach (stream->priv->ptmap, (GHFunc) add_rtx_pt, pt_map);
2364   g_object_set (stream->priv->rtxreceive, "payload-type-map", pt_map, NULL);
2365   gst_structure_free (pt_map);
2366
2367 done:
2368   return;
2369 }
2370
2371 static void
2372 retrieve_ulpfec_pt (gpointer key, GstCaps * caps, GstElement * ulpfec_decoder)
2373 {
2374   guint pt = GPOINTER_TO_INT (key);
2375   const GstStructure *s = gst_caps_get_structure (caps, 0);
2376
2377   if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
2378     g_object_set (ulpfec_decoder, "pt", pt, NULL);
2379 }
2380
2381 static void
2382 update_ulpfec_decoder_pt (GstRTSPStream * stream)
2383 {
2384   if (!stream->priv->ulpfec_decoder)
2385     goto done;
2386
2387   g_hash_table_foreach (stream->priv->ptmap, (GHFunc) retrieve_ulpfec_pt,
2388       stream->priv->ulpfec_decoder);
2389
2390 done:
2391   return;
2392 }
2393
2394 /**
2395  * gst_rtsp_stream_request_aux_receiver:
2396  * @stream: a #GstRTSPStream
2397  * @sessid: the session id
2398  *
2399  * Creating a rtxreceive bin
2400  *
2401  * Returns: (transfer full) (nullable): a #GstElement.
2402  *
2403  * Since: 1.16
2404  */
2405 GstElement *
2406 gst_rtsp_stream_request_aux_receiver (GstRTSPStream * stream, guint sessid)
2407 {
2408   GstElement *bin;
2409   GstPad *pad;
2410   gchar *name;
2411
2412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2413
2414   bin = gst_bin_new (NULL);
2415   stream->priv->rtxreceive = gst_element_factory_make ("rtprtxreceive", NULL);
2416   update_rtx_receive_pt_map (stream);
2417   update_ulpfec_decoder_pt (stream);
2418   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxreceive));
2419
2420   pad = gst_element_get_static_pad (stream->priv->rtxreceive, "src");
2421   name = g_strdup_printf ("src_%u", sessid);
2422   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2423   g_free (name);
2424   gst_object_unref (pad);
2425
2426   pad = gst_element_get_static_pad (stream->priv->rtxreceive, "sink");
2427   name = g_strdup_printf ("sink_%u", sessid);
2428   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2429   g_free (name);
2430   gst_object_unref (pad);
2431
2432   return bin;
2433 }
2434
2435 /**
2436  * gst_rtsp_stream_set_pt_map:
2437  * @stream: a #GstRTSPStream
2438  * @pt: the pt
2439  * @caps: a #GstCaps
2440  *
2441  * Configure a pt map between @pt and @caps.
2442  */
2443 void
2444 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2445 {
2446   GstRTSPStreamPrivate *priv = stream->priv;
2447
2448   if (!GST_IS_CAPS (caps))
2449     return;
2450
2451   g_mutex_lock (&priv->lock);
2452   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2453   update_rtx_receive_pt_map (stream);
2454   g_mutex_unlock (&priv->lock);
2455 }
2456
2457 /**
2458  * gst_rtsp_stream_set_publish_clock_mode:
2459  * @stream: a #GstRTSPStream
2460  * @mode: the clock publish mode
2461  *
2462  * Sets if and how the stream clock should be published according to RFC7273.
2463  *
2464  * Since: 1.8
2465  */
2466 void
2467 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2468     GstRTSPPublishClockMode mode)
2469 {
2470   GstRTSPStreamPrivate *priv;
2471
2472   priv = stream->priv;
2473   g_mutex_lock (&priv->lock);
2474   priv->publish_clock_mode = mode;
2475   g_mutex_unlock (&priv->lock);
2476 }
2477
2478 /**
2479  * gst_rtsp_stream_get_publish_clock_mode:
2480  * @stream: a #GstRTSPStream
2481  *
2482  * Gets if and how the stream clock should be published according to RFC7273.
2483  *
2484  * Returns: The GstRTSPPublishClockMode
2485  *
2486  * Since: 1.8
2487  */
2488 GstRTSPPublishClockMode
2489 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2490 {
2491   GstRTSPStreamPrivate *priv;
2492   GstRTSPPublishClockMode ret;
2493
2494   priv = stream->priv;
2495   g_mutex_lock (&priv->lock);
2496   ret = priv->publish_clock_mode;
2497   g_mutex_unlock (&priv->lock);
2498
2499   return ret;
2500 }
2501
2502 static GstCaps *
2503 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2504     GstRTSPStream * stream)
2505 {
2506   GstRTSPStreamPrivate *priv = stream->priv;
2507   GstCaps *caps = NULL;
2508
2509   g_mutex_lock (&priv->lock);
2510
2511   if (priv->idx == session) {
2512     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2513     if (caps) {
2514       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2515       gst_caps_ref (caps);
2516     } else {
2517       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2518     }
2519   }
2520
2521   g_mutex_unlock (&priv->lock);
2522
2523   return caps;
2524 }
2525
2526 static void
2527 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2528 {
2529   GstRTSPStreamPrivate *priv = stream->priv;
2530   gchar *name;
2531   GstPadLinkReturn ret;
2532   guint sessid;
2533
2534   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2535       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2536
2537   name = gst_pad_get_name (pad);
2538   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2539     g_free (name);
2540     return;
2541   }
2542   g_free (name);
2543
2544   if (priv->idx != sessid)
2545     return;
2546
2547   if (gst_pad_is_linked (priv->sinkpad)) {
2548     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2549         GST_DEBUG_PAD_NAME (priv->sinkpad));
2550     return;
2551   }
2552
2553   /* link the RTP pad to the session manager, it should not really fail unless
2554    * this is not really an RTP pad */
2555   ret = gst_pad_link (pad, priv->sinkpad);
2556   if (ret != GST_PAD_LINK_OK)
2557     goto link_failed;
2558   priv->recv_rtp_src = gst_object_ref (pad);
2559
2560   return;
2561
2562 /* ERRORS */
2563 link_failed:
2564   {
2565     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2566         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2567   }
2568 }
2569
2570 static void
2571 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2572     GstRTSPStream * stream)
2573 {
2574   /* TODO: What to do here other than this? */
2575   GST_DEBUG ("Stream %p: Got EOS", stream);
2576   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2577 }
2578
2579 typedef struct _ProbeData ProbeData;
2580
2581 struct _ProbeData
2582 {
2583   GstRTSPStream *stream;
2584   /* existing sink, already linked to tee */
2585   GstElement *sink1;
2586   /* new sink, about to be linked */
2587   GstElement *sink2;
2588   /* new queue element, that will be linked to tee and sink1 */
2589   GstElement **queue1;
2590   /* new queue element, that will be linked to tee and sink2 */
2591   GstElement **queue2;
2592   GstPad *sink_pad;
2593   GstPad *tee_pad;
2594   guint index;
2595 };
2596
2597 static void
2598 free_cb_data (gpointer user_data)
2599 {
2600   ProbeData *data = user_data;
2601
2602   gst_object_unref (data->stream);
2603   gst_object_unref (data->sink1);
2604   gst_object_unref (data->sink2);
2605   gst_object_unref (data->sink_pad);
2606   gst_object_unref (data->tee_pad);
2607   g_free (data);
2608 }
2609
2610
2611 static void
2612 create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream,
2613     GstElement * tee, GstElement * sink, GstElement ** queue)
2614 {
2615   GstRTSPStreamPrivate *priv = stream->priv;
2616   GstPad *tee_pad;
2617   GstPad *queue_pad;
2618   GstPad *sink_pad;
2619
2620   /* create queue for the new stream */
2621   *queue = gst_element_factory_make ("queue", NULL);
2622   g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0,
2623       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2624   gst_bin_add (priv->joined_bin, *queue);
2625
2626   /* link tee to queue */
2627   tee_pad = gst_element_get_request_pad (tee, "src_%u");
2628   queue_pad = gst_element_get_static_pad (*queue, "sink");
2629   gst_pad_link (tee_pad, queue_pad);
2630   gst_object_unref (queue_pad);
2631   gst_object_unref (tee_pad);
2632
2633   /* link queue to sink */
2634   queue_pad = gst_element_get_static_pad (*queue, "src");
2635   sink_pad = gst_element_get_static_pad (sink, "sink");
2636   gst_pad_link (queue_pad, sink_pad);
2637   gst_object_unref (queue_pad);
2638   gst_object_unref (sink_pad);
2639
2640   gst_element_sync_state_with_parent (sink);
2641   gst_element_sync_state_with_parent (*queue);
2642 }
2643
2644 static GstPadProbeReturn
2645 create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad,
2646     GstPadProbeInfo * info, gpointer user_data)
2647 {
2648   GstRTSPStreamPrivate *priv;
2649   ProbeData *data = user_data;
2650   GstRTSPStream *stream;
2651   GstElement **queue1;
2652   GstElement **queue2;
2653   GstPad *sink_pad;
2654   GstPad *tee_pad;
2655   GstPad *queue_pad;
2656   guint index;
2657
2658   stream = data->stream;
2659   priv = stream->priv;
2660   queue1 = data->queue1;
2661   queue2 = data->queue2;
2662   sink_pad = data->sink_pad;
2663   tee_pad = data->tee_pad;
2664   index = data->index;
2665
2666   /* unlink tee and the existing sink:
2667    *   .-----.    .---------.
2668    *   | tee |    |  sink1  |
2669    * sink   src->sink       |
2670    *   '-----'    '---------'
2671    */
2672   g_assert (gst_pad_unlink (tee_pad, sink_pad));
2673
2674   /* add queue to the already existing stream */
2675   *queue1 = gst_element_factory_make ("queue", NULL);
2676   g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0,
2677       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2678   gst_bin_add (priv->joined_bin, *queue1);
2679
2680   /* link tee, queue and sink:
2681    *   .-----.    .---------.    .---------.
2682    *   | tee |    |  queue1 |    | sink1   |
2683    * sink   src->sink      src->sink       |
2684    *   '-----'    '---------'    '---------'
2685    */
2686   queue_pad = gst_element_get_static_pad (*queue1, "sink");
2687   gst_pad_link (tee_pad, queue_pad);
2688   gst_object_unref (queue_pad);
2689   queue_pad = gst_element_get_static_pad (*queue1, "src");
2690   gst_pad_link (queue_pad, sink_pad);
2691   gst_object_unref (queue_pad);
2692
2693   gst_element_sync_state_with_parent (*queue1);
2694
2695   /* create queue and link it to tee and the new sink */
2696   create_and_plug_queue_to_unlinked_stream (stream,
2697       priv->tee[index], data->sink2, queue2);
2698
2699   /* the final stream:
2700    *
2701    *    .-----.    .---------.    .---------.
2702    *    | tee |    |  queue1 |    | sink1   |
2703    *  sink   src->sink      src->sink       |
2704    *    |     |    '---------'    '---------'
2705    *    |     |    .---------.    .---------.
2706    *    |     |    |  queue2 |    | sink2   |
2707    *    |    src->sink      src->sink       |
2708    *    '-----'    '---------'    '---------'
2709    */
2710
2711   return GST_PAD_PROBE_REMOVE;
2712 }
2713
2714 static void
2715 create_and_plug_queue_to_linked_stream (GstRTSPStream * stream,
2716     GstElement * sink1, GstElement * sink2, guint index, GstElement ** queue1,
2717     GstElement ** queue2)
2718 {
2719   ProbeData *data;
2720
2721   data = g_new0 (ProbeData, 1);
2722   data->stream = gst_object_ref (stream);
2723   data->sink1 = gst_object_ref (sink1);
2724   data->sink2 = gst_object_ref (sink2);
2725   data->queue1 = queue1;
2726   data->queue2 = queue2;
2727   data->index = index;
2728
2729   data->sink_pad = gst_element_get_static_pad (sink1, "sink");
2730   g_assert (data->sink_pad);
2731   data->tee_pad = gst_pad_get_peer (data->sink_pad);
2732   g_assert (data->tee_pad);
2733
2734   gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE,
2735       create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data);
2736 }
2737
2738 static void
2739 plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug,
2740     GstElement ** queue_to_plug, guint index, gboolean is_mcast)
2741 {
2742   GstRTSPStreamPrivate *priv = stream->priv;
2743   GstElement *existing_sink;
2744
2745   if (is_mcast)
2746     existing_sink = priv->udpsink[index];
2747   else
2748     existing_sink = priv->mcast_udpsink[index];
2749
2750   GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp");
2751
2752   /* add sink to the bin */
2753   gst_bin_add (priv->joined_bin, sink_to_plug);
2754
2755   if (priv->appsink[index] && existing_sink) {
2756
2757     /* queues are already added for the existing stream, add one for
2758        the newly added udp stream */
2759     create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index],
2760         sink_to_plug, queue_to_plug);
2761
2762   } else if (priv->appsink[index] || existing_sink) {
2763     GstElement **queue;
2764     GstElement *element;
2765
2766     /* add queue to the already existing stream plus the newly created udp
2767        stream */
2768     if (priv->appsink[index]) {
2769       element = priv->appsink[index];
2770       queue = &priv->appqueue[index];
2771     } else {
2772       element = existing_sink;
2773       if (is_mcast)
2774         queue = &priv->udpqueue[index];
2775       else
2776         queue = &priv->mcast_udpqueue[index];
2777     }
2778
2779     create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug,
2780         index, queue, queue_to_plug);
2781
2782   } else {
2783     GstPad *tee_pad;
2784     GstPad *sink_pad;
2785
2786     GST_DEBUG_OBJECT (stream, "creating first stream");
2787
2788     /* no need to add queues */
2789     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2790     sink_pad = gst_element_get_static_pad (sink_to_plug, "sink");
2791     gst_pad_link (tee_pad, sink_pad);
2792     gst_object_unref (tee_pad);
2793     gst_object_unref (sink_pad);
2794   }
2795
2796   gst_element_sync_state_with_parent (sink_to_plug);
2797 }
2798
2799 static void
2800 plug_tcp_sink (GstRTSPStream * stream, guint index)
2801 {
2802   GstRTSPStreamPrivate *priv = stream->priv;
2803
2804   GST_DEBUG_OBJECT (stream, "plug tcp sink");
2805
2806   /* add sink to the bin */
2807   gst_bin_add (priv->joined_bin, priv->appsink[index]);
2808
2809   if (priv->mcast_udpsink[index] && priv->udpsink[index]) {
2810
2811     /* queues are already added for the existing stream, add one for
2812        the newly added tcp stream */
2813     create_and_plug_queue_to_unlinked_stream (stream,
2814         priv->tee[index], priv->appsink[index], &priv->appqueue[index]);
2815
2816   } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) {
2817     GstElement **queue;
2818     GstElement *element;
2819
2820     /* add queue to the already existing stream plus the newly created tcp
2821        stream */
2822     if (priv->mcast_udpsink[index]) {
2823       element = priv->mcast_udpsink[index];
2824       queue = &priv->mcast_udpqueue[index];
2825     } else {
2826       element = priv->udpsink[index];
2827       queue = &priv->udpqueue[index];
2828     }
2829
2830     create_and_plug_queue_to_linked_stream (stream, element,
2831         priv->appsink[index], index, queue, &priv->appqueue[index]);
2832
2833   } else {
2834     GstPad *tee_pad;
2835     GstPad *sink_pad;
2836
2837     /* no need to add queues */
2838     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2839     sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink");
2840     gst_pad_link (tee_pad, sink_pad);
2841     gst_object_unref (tee_pad);
2842     gst_object_unref (sink_pad);
2843   }
2844
2845   gst_element_sync_state_with_parent (priv->appsink[index]);
2846 }
2847
2848 static void
2849 plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport,
2850     guint index)
2851 {
2852   GstRTSPStreamPrivate *priv;
2853   gboolean is_tcp, is_udp, is_mcast;
2854   priv = stream->priv;
2855
2856   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2857   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2858   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2859
2860   if (is_udp)
2861     plug_udp_sink (stream, priv->udpsink[index],
2862         &priv->udpqueue[index], index, FALSE);
2863
2864   else if (is_mcast)
2865     plug_udp_sink (stream, priv->mcast_udpsink[index],
2866         &priv->mcast_udpqueue[index], index, TRUE);
2867
2868   else if (is_tcp)
2869     plug_tcp_sink (stream, index);
2870 }
2871
2872 /* must be called with lock */
2873 static gboolean
2874 create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
2875 {
2876   GstRTSPStreamPrivate *priv;
2877   GstPad *pad;
2878   GstBin *bin;
2879   gboolean is_tcp, is_udp, is_mcast;
2880   gint mcast_ttl = 0;
2881   gint i;
2882
2883   GST_DEBUG_OBJECT (stream, "create sender part");
2884   priv = stream->priv;
2885   bin = priv->joined_bin;
2886
2887   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2888   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2889   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2890
2891   if (is_mcast)
2892     mcast_ttl = transport->ttl;
2893
2894   GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d (ttl: %d)", is_tcp,
2895       is_udp, is_mcast, mcast_ttl);
2896
2897   if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) {
2898     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP");
2899     return FALSE;
2900   }
2901
2902   if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) {
2903     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast");
2904     return FALSE;
2905   }
2906
2907   for (i = 0; i < 2; i++) {
2908     gboolean link_tee = FALSE;
2909     /* For the sender we create this bit of pipeline for both
2910      * RTP and RTCP.
2911      * Initially there will be only one active transport for
2912      * the stream, so the pipeline will look like this:
2913      *
2914      * .--------.      .-----.    .---------.
2915      * | rtpbin |      | tee |    |  sink   |
2916      * |       send->sink   src->sink       |
2917      * '--------'      '-----'    '---------'
2918      *
2919      * For each new transport, the already existing branch will
2920      * be reconfigured by adding a queue element:
2921      *
2922      * .--------.      .-----.    .---------.    .---------.
2923      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2924      * |       send->sink   src->sink      src->sink       |
2925      * '--------'      |     |    '---------'    '---------'
2926      *                 |     |    .---------.    .---------.
2927      *                 |     |    |  queue  |    | udpsink |
2928      *                 |    src->sink      src->sink       |
2929      *                 |     |    '---------'    '---------'
2930      *                 |     |    .---------.    .---------.
2931      *                 |     |    |  queue  |    | appsink |
2932      *                 |    src->sink      src->sink       |
2933      *                 '-----'    '---------'    '---------'
2934      */
2935
2936     /* Only link the RTP send src if we're going to send RTP, link
2937      * the RTCP send src always */
2938     if (!priv->srcpad && i == 0)
2939       continue;
2940
2941     if (!priv->tee[i]) {
2942       /* make tee for RTP/RTCP */
2943       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2944       gst_bin_add (bin, priv->tee[i]);
2945       link_tee = TRUE;
2946     }
2947
2948     if (is_udp && !priv->udpsink[i]) {
2949       /* we create only one pair of udpsinks for IPv4 and IPv6 */
2950       create_and_configure_udpsink (stream, &priv->udpsink[i],
2951           priv->socket_v4[i], priv->socket_v6[i], FALSE, (i == 0), mcast_ttl);
2952       plug_sink (stream, transport, i);
2953     } else if (is_mcast && !priv->mcast_udpsink[i]) {
2954       /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */
2955       create_and_configure_udpsink (stream, &priv->mcast_udpsink[i],
2956           priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0),
2957           mcast_ttl);
2958       plug_sink (stream, transport, i);
2959     } else if (is_tcp && !priv->appsink[i]) {
2960       /* make appsink */
2961       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2962       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2963
2964       /* we need to set sync and preroll to FALSE for the sink to avoid
2965        * deadlock. This is only needed for sink sending RTCP data. */
2966       if (i == 1)
2967         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2968
2969       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2970           &sink_cb, stream, NULL);
2971       plug_sink (stream, transport, i);
2972     }
2973
2974     if (link_tee) {
2975       /* and link to rtpbin send pad */
2976       gst_element_sync_state_with_parent (priv->tee[i]);
2977       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2978       gst_pad_link (priv->send_src[i], pad);
2979       gst_object_unref (pad);
2980     }
2981   }
2982
2983   return TRUE;
2984 }
2985
2986 /* must be called with lock */
2987 static void
2988 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2989     GstElement * funnel)
2990 {
2991   GstRTSPStreamPrivate *priv;
2992   GstPad *pad, *selpad;
2993
2994   priv = stream->priv;
2995
2996   if (priv->srcpad) {
2997     /* we set and keep these to playing so that they don't cause NO_PREROLL return
2998      * values. This is only relevant for PLAY pipelines */
2999     gst_element_set_state (src, GST_STATE_PLAYING);
3000     gst_element_set_locked_state (src, TRUE);
3001   }
3002
3003   /* add src */
3004   gst_bin_add (bin, src);
3005
3006   /* and link to the funnel */
3007   selpad = gst_element_get_request_pad (funnel, "sink_%u");
3008   pad = gst_element_get_static_pad (src, "src");
3009   gst_pad_link (pad, selpad);
3010   gst_object_unref (pad);
3011   gst_object_unref (selpad);
3012 }
3013
3014 /* must be called with lock */
3015 static gboolean
3016 create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport *
3017     transport)
3018 {
3019   GstRTSPStreamPrivate *priv;
3020   GstPad *pad;
3021   GstBin *bin;
3022   gboolean tcp;
3023   gboolean udp;
3024   gboolean mcast;
3025   gint i;
3026
3027   GST_DEBUG_OBJECT (stream, "create receiver part");
3028   priv = stream->priv;
3029   bin = priv->joined_bin;
3030
3031   tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3032   udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3033   mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3034
3035   for (i = 0; i < 2; i++) {
3036     /* For the receiver we create this bit of pipeline for both
3037      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
3038      * and it is all funneled into the rtpbin receive pad.
3039      *
3040      *
3041      * .--------.     .--------.    .--------.
3042      * | udpsrc |     | funnel |    | rtpbin |
3043      * | RTP    src->sink      src->sink     |
3044      * '--------'     |        |    |        |
3045      * .--------.     |        |    |        |
3046      * | appsrc |     |        |    |        |
3047      * | RTP    src->sink      |    |        |
3048      * '--------'     '--------'    |        |
3049      *                              |        |
3050      * .--------.     .--------.    |        |
3051      * | udpsrc |     | funnel |    |        |
3052      * | RTCP   src->sink      src->sink     |
3053      * '--------'     |        |    '--------'
3054      * .--------.     |        |
3055      * | appsrc |     |        |
3056      * | RTCP   src->sink      |
3057      * '--------'     '--------'
3058      */
3059
3060     if (!priv->sinkpad && i == 0) {
3061       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
3062        * RTCP sink always */
3063       continue;
3064     }
3065
3066     /* make funnel for the RTP/RTCP receivers */
3067     if (!priv->funnel[i]) {
3068       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
3069       gst_bin_add (bin, priv->funnel[i]);
3070
3071       pad = gst_element_get_static_pad (priv->funnel[i], "src");
3072       gst_pad_link (pad, priv->recv_sink[i]);
3073       gst_object_unref (pad);
3074     }
3075
3076     if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) {
3077       GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources");
3078       if (!create_and_configure_udpsource (&priv->udpsrc_v4[i],
3079               priv->socket_v4[i]))
3080         goto udpsrc_error;
3081
3082       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
3083     }
3084
3085     if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) {
3086       GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources");
3087       if (!create_and_configure_udpsource (&priv->udpsrc_v6[i],
3088               priv->socket_v6[i]))
3089         goto udpsrc_error;
3090
3091       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
3092     }
3093
3094     if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) {
3095       GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources");
3096       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i],
3097               priv->mcast_socket_v4[i]))
3098         goto mcast_udpsrc_error;
3099       plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
3100     }
3101
3102     if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) {
3103       GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources");
3104       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i],
3105               priv->mcast_socket_v6[i]))
3106         goto mcast_udpsrc_error;
3107       plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
3108     }
3109
3110     if (tcp && !priv->appsrc[i]) {
3111       /* make and add appsrc */
3112       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
3113       priv->appsrc_base_time[i] = -1;
3114       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
3115           TRUE, NULL);
3116       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
3117     }
3118
3119     gst_element_sync_state_with_parent (priv->funnel[i]);
3120   }
3121
3122   return TRUE;
3123
3124 mcast_udpsrc_error:
3125 udpsrc_error:
3126   return FALSE;
3127 }
3128
3129 static gboolean
3130 check_mcast_part_for_transport (GstRTSPStream * stream,
3131     const GstRTSPTransport * tr)
3132 {
3133   GstRTSPStreamPrivate *priv = stream->priv;
3134   GInetAddress *inetaddr;
3135   GSocketFamily family;
3136   GstRTSPAddress *mcast_addr;
3137
3138   /* Check if it's a ipv4 or ipv6 transport */
3139   inetaddr = g_inet_address_new_from_string (tr->destination);
3140   family = g_inet_address_get_family (inetaddr);
3141   g_object_unref (inetaddr);
3142
3143   /* Select fields corresponding to the family */
3144   if (family == G_SOCKET_FAMILY_IPV4) {
3145     mcast_addr = priv->mcast_addr_v4;
3146   } else {
3147     mcast_addr = priv->mcast_addr_v6;
3148   }
3149
3150   /* We support only one mcast group per family, make sure this transport
3151    * matches it. */
3152   if (!mcast_addr)
3153     goto no_addr;
3154
3155   if (g_ascii_strcasecmp (tr->destination, mcast_addr->address) != 0 ||
3156       tr->port.min != mcast_addr->port ||
3157       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
3158       tr->ttl != mcast_addr->ttl)
3159     goto wrong_addr;
3160
3161   return TRUE;
3162
3163 no_addr:
3164   {
3165     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
3166         "has been reserved");
3167     return FALSE;
3168   }
3169 wrong_addr:
3170   {
3171     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
3172         "the reserved address");
3173     return FALSE;
3174   }
3175 }
3176
3177 /**
3178  * gst_rtsp_stream_join_bin:
3179  * @stream: a #GstRTSPStream
3180  * @bin: (transfer none): a #GstBin to join
3181  * @rtpbin: (transfer none): a rtpbin element in @bin
3182  * @state: the target state of the new elements
3183  *
3184  * Join the #GstBin @bin that contains the element @rtpbin.
3185  *
3186  * @stream will link to @rtpbin, which must be inside @bin. The elements
3187  * added to @bin will be set to the state given in @state.
3188  *
3189  * Returns: %TRUE on success.
3190  */
3191 gboolean
3192 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
3193     GstElement * rtpbin, GstState state)
3194 {
3195   GstRTSPStreamPrivate *priv;
3196   guint idx;
3197   gchar *name;
3198   GstPadLinkReturn ret;
3199
3200   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3201   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3202   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3203
3204   priv = stream->priv;
3205
3206   g_mutex_lock (&priv->lock);
3207   if (priv->joined_bin != NULL)
3208     goto was_joined;
3209
3210   /* create a session with the same index as the stream */
3211   idx = priv->idx;
3212
3213   GST_INFO ("stream %p joining bin as session %u", stream, idx);
3214
3215   if (priv->profiles & GST_RTSP_PROFILE_SAVP
3216       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
3217     /* For SRTP */
3218     g_signal_connect (rtpbin, "request-rtp-encoder",
3219         (GCallback) request_rtp_encoder, stream);
3220     g_signal_connect (rtpbin, "request-rtcp-encoder",
3221         (GCallback) request_rtcp_encoder, stream);
3222     g_signal_connect (rtpbin, "request-rtp-decoder",
3223         (GCallback) request_rtp_rtcp_decoder, stream);
3224     g_signal_connect (rtpbin, "request-rtcp-decoder",
3225         (GCallback) request_rtp_rtcp_decoder, stream);
3226   }
3227
3228   if (priv->sinkpad) {
3229     g_signal_connect (rtpbin, "request-pt-map",
3230         (GCallback) request_pt_map, stream);
3231   }
3232
3233   /* get pads from the RTP session element for sending and receiving
3234    * RTP/RTCP*/
3235   if (priv->srcpad) {
3236     /* get a pad for sending RTP */
3237     name = g_strdup_printf ("send_rtp_sink_%u", idx);
3238     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
3239     g_free (name);
3240
3241     /* link the RTP pad to the session manager, it should not really fail unless
3242      * this is not really an RTP pad */
3243     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
3244     if (ret != GST_PAD_LINK_OK)
3245       goto link_failed;
3246
3247     name = g_strdup_printf ("send_rtp_src_%u", idx);
3248     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
3249     g_free (name);
3250   } else {
3251     /* RECORD case: need to connect our sinkpad from here */
3252     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
3253     /* EOS */
3254     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
3255
3256     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
3257     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
3258     g_free (name);
3259   }
3260
3261   name = g_strdup_printf ("send_rtcp_src_%u", idx);
3262   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
3263   g_free (name);
3264   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
3265   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
3266   g_free (name);
3267
3268   /* get the session */
3269   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
3270
3271   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
3272       stream);
3273   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
3274       stream);
3275   g_signal_connect (priv->session, "on-ssrc-active",
3276       (GCallback) on_ssrc_active, stream);
3277   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
3278       stream);
3279   g_signal_connect (priv->session, "on-bye-timeout",
3280       (GCallback) on_bye_timeout, stream);
3281   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
3282       stream);
3283
3284   /* signal for sender ssrc */
3285   g_signal_connect (priv->session, "on-new-sender-ssrc",
3286       (GCallback) on_new_sender_ssrc, stream);
3287   g_signal_connect (priv->session, "on-sender-ssrc-active",
3288       (GCallback) on_sender_ssrc_active, stream);
3289
3290   if (priv->srcpad) {
3291     /* be notified of caps changes */
3292     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
3293         (GCallback) caps_notify, stream);
3294     priv->caps = gst_pad_get_current_caps (priv->send_src[0]);
3295   }
3296
3297   priv->joined_bin = bin;
3298   GST_DEBUG_OBJECT (stream, "successfully joined bin");
3299   g_mutex_unlock (&priv->lock);
3300
3301   return TRUE;
3302
3303   /* ERRORS */
3304 was_joined:
3305   {
3306     g_mutex_unlock (&priv->lock);
3307     return TRUE;
3308   }
3309 link_failed:
3310   {
3311     GST_WARNING ("failed to link stream %u", idx);
3312     gst_object_unref (priv->send_rtp_sink);
3313     priv->send_rtp_sink = NULL;
3314     g_mutex_unlock (&priv->lock);
3315     return FALSE;
3316   }
3317 }
3318
3319 static void
3320 clear_element (GstBin * bin, GstElement ** elementptr)
3321 {
3322   if (*elementptr) {
3323     gst_element_set_locked_state (*elementptr, FALSE);
3324     gst_element_set_state (*elementptr, GST_STATE_NULL);
3325     if (GST_ELEMENT_PARENT (*elementptr))
3326       gst_bin_remove (bin, *elementptr);
3327     else
3328       gst_object_unref (*elementptr);
3329     *elementptr = NULL;
3330   }
3331 }
3332
3333 /**
3334  * gst_rtsp_stream_leave_bin:
3335  * @stream: a #GstRTSPStream
3336  * @bin: (transfer none): a #GstBin
3337  * @rtpbin: (transfer none): a rtpbin #GstElement
3338  *
3339  * Remove the elements of @stream from @bin.
3340  *
3341  * Return: %TRUE on success.
3342  */
3343 gboolean
3344 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
3345     GstElement * rtpbin)
3346 {
3347   GstRTSPStreamPrivate *priv;
3348   gint i;
3349
3350   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3351   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3352   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3353
3354   priv = stream->priv;
3355
3356   g_mutex_lock (&priv->lock);
3357   if (priv->joined_bin == NULL)
3358     goto was_not_joined;
3359   if (priv->joined_bin != bin)
3360     goto wrong_bin;
3361
3362   priv->joined_bin = NULL;
3363
3364   /* all transports must be removed by now */
3365   if (priv->transports != NULL)
3366     goto transports_not_removed;
3367
3368   clear_tr_cache (priv, TRUE);
3369   clear_tr_cache (priv, FALSE);
3370
3371   GST_INFO ("stream %p leaving bin", stream);
3372
3373   if (priv->srcpad) {
3374     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
3375
3376     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
3377     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
3378     gst_object_unref (priv->send_rtp_sink);
3379     priv->send_rtp_sink = NULL;
3380   } else if (priv->recv_rtp_src) {
3381     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
3382     gst_object_unref (priv->recv_rtp_src);
3383     priv->recv_rtp_src = NULL;
3384   }
3385
3386   for (i = 0; i < 2; i++) {
3387     clear_element (bin, &priv->udpsrc_v4[i]);
3388     clear_element (bin, &priv->udpsrc_v6[i]);
3389     clear_element (bin, &priv->udpqueue[i]);
3390     clear_element (bin, &priv->udpsink[i]);
3391
3392     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
3393     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
3394     clear_element (bin, &priv->mcast_udpqueue[i]);
3395     clear_element (bin, &priv->mcast_udpsink[i]);
3396
3397     clear_element (bin, &priv->appsrc[i]);
3398     clear_element (bin, &priv->appqueue[i]);
3399     clear_element (bin, &priv->appsink[i]);
3400
3401     clear_element (bin, &priv->tee[i]);
3402     clear_element (bin, &priv->funnel[i]);
3403
3404     if (priv->sinkpad || i == 1) {
3405       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3406       gst_object_unref (priv->recv_sink[i]);
3407       priv->recv_sink[i] = NULL;
3408     }
3409   }
3410
3411   if (priv->srcpad) {
3412     gst_object_unref (priv->send_src[0]);
3413     priv->send_src[0] = NULL;
3414   }
3415
3416   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3417   gst_object_unref (priv->send_src[1]);
3418   priv->send_src[1] = NULL;
3419
3420   g_object_unref (priv->session);
3421   priv->session = NULL;
3422   if (priv->caps)
3423     gst_caps_unref (priv->caps);
3424   priv->caps = NULL;
3425
3426   if (priv->srtpenc)
3427     gst_object_unref (priv->srtpenc);
3428   if (priv->srtpdec)
3429     gst_object_unref (priv->srtpdec);
3430
3431   if (priv->mcast_addr_v4)
3432     gst_rtsp_address_free (priv->mcast_addr_v4);
3433   priv->mcast_addr_v4 = NULL;
3434   if (priv->mcast_addr_v6)
3435     gst_rtsp_address_free (priv->mcast_addr_v6);
3436   priv->mcast_addr_v6 = NULL;
3437   if (priv->server_addr_v4)
3438     gst_rtsp_address_free (priv->server_addr_v4);
3439   priv->server_addr_v4 = NULL;
3440   if (priv->server_addr_v6)
3441     gst_rtsp_address_free (priv->server_addr_v6);
3442   priv->server_addr_v6 = NULL;
3443
3444   g_mutex_unlock (&priv->lock);
3445
3446   return TRUE;
3447
3448 was_not_joined:
3449   {
3450     g_mutex_unlock (&priv->lock);
3451     return TRUE;
3452   }
3453 transports_not_removed:
3454   {
3455     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3456     g_mutex_unlock (&priv->lock);
3457     return FALSE;
3458   }
3459 wrong_bin:
3460   {
3461     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
3462     g_mutex_unlock (&priv->lock);
3463     return FALSE;
3464   }
3465 }
3466
3467 /**
3468  * gst_rtsp_stream_get_joined_bin:
3469  * @stream: a #GstRTSPStream
3470  *
3471  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3472  *
3473  * Return: (transfer full) (nullable): the joined bin or NULL.
3474  */
3475 GstBin *
3476 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3477 {
3478   GstRTSPStreamPrivate *priv;
3479   GstBin *bin = NULL;
3480
3481   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3482
3483   priv = stream->priv;
3484
3485   g_mutex_lock (&priv->lock);
3486   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3487   g_mutex_unlock (&priv->lock);
3488
3489   return bin;
3490 }
3491
3492 /**
3493  * gst_rtsp_stream_get_rtpinfo:
3494  * @stream: a #GstRTSPStream
3495  * @rtptime: (allow-none) (out caller-allocates): result RTP timestamp
3496  * @seq: (allow-none) (out caller-allocates): result RTP seqnum
3497  * @clock_rate: (allow-none) (out caller-allocates): the clock rate
3498  * @running_time: (out caller-allocates): result running-time
3499  *
3500  * Retrieve the current rtptime, seq and running-time. This is used to
3501  * construct a RTPInfo reply header.
3502  *
3503  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3504  */
3505 gboolean
3506 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3507     guint * rtptime, guint * seq, guint * clock_rate,
3508     GstClockTime * running_time)
3509 {
3510   GstRTSPStreamPrivate *priv;
3511   GstStructure *stats;
3512   GObjectClass *payobjclass;
3513
3514   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3515
3516   priv = stream->priv;
3517
3518   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3519
3520   g_mutex_lock (&priv->lock);
3521
3522   /* First try to extract the information from the last buffer on the sinks.
3523    * This will have a more accurate sequence number and timestamp, as between
3524    * the payloader and the sink there can be some queues
3525    */
3526   if (priv->udpsink[0] || priv->appsink[0]) {
3527     GstSample *last_sample;
3528
3529     if (priv->udpsink[0])
3530       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3531     else
3532       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3533
3534     if (last_sample) {
3535       GstCaps *caps;
3536       GstBuffer *buffer;
3537       GstSegment *segment;
3538       GstStructure *s;
3539       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3540
3541       caps = gst_sample_get_caps (last_sample);
3542       buffer = gst_sample_get_buffer (last_sample);
3543       segment = gst_sample_get_segment (last_sample);
3544       s = gst_caps_get_structure (caps, 0);
3545
3546       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3547         guint ssrc_buf = gst_rtp_buffer_get_ssrc (&rtp_buffer);
3548         guint ssrc_stream = 0;
3549         if (gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) &&
3550             gst_structure_get_uint (s, "ssrc", &ssrc_stream) &&
3551             ssrc_buf != ssrc_stream) {
3552           /* Skip buffers from auxiliary streams. */
3553           GST_DEBUG_OBJECT (stream,
3554               "not a buffer from the payloader, SSRC: %08x", ssrc_buf);
3555
3556           gst_rtp_buffer_unmap (&rtp_buffer);
3557           gst_sample_unref (last_sample);
3558           goto stats;
3559         }
3560
3561         if (seq) {
3562           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3563         }
3564
3565         if (rtptime) {
3566           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3567         }
3568
3569         gst_rtp_buffer_unmap (&rtp_buffer);
3570
3571         if (running_time) {
3572           *running_time =
3573               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3574               GST_BUFFER_TIMESTAMP (buffer));
3575         }
3576
3577         if (clock_rate) {
3578           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3579
3580           if (*clock_rate == 0 && running_time)
3581             *running_time = GST_CLOCK_TIME_NONE;
3582         }
3583         gst_sample_unref (last_sample);
3584
3585         goto done;
3586       } else {
3587         gst_sample_unref (last_sample);
3588       }
3589     }
3590   }
3591
3592 stats:
3593   if (g_object_class_find_property (payobjclass, "stats")) {
3594     g_object_get (priv->payloader, "stats", &stats, NULL);
3595     if (stats == NULL)
3596       goto no_stats;
3597
3598     if (seq)
3599       gst_structure_get_uint (stats, "seqnum", seq);
3600
3601     if (rtptime)
3602       gst_structure_get_uint (stats, "timestamp", rtptime);
3603
3604     if (running_time)
3605       gst_structure_get_clock_time (stats, "running-time", running_time);
3606
3607     if (clock_rate) {
3608       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3609       if (*clock_rate == 0 && running_time)
3610         *running_time = GST_CLOCK_TIME_NONE;
3611     }
3612     gst_structure_free (stats);
3613   } else {
3614     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3615         !g_object_class_find_property (payobjclass, "timestamp"))
3616       goto no_stats;
3617
3618     if (seq)
3619       g_object_get (priv->payloader, "seqnum", seq, NULL);
3620
3621     if (rtptime)
3622       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3623
3624     if (running_time)
3625       *running_time = GST_CLOCK_TIME_NONE;
3626   }
3627
3628 done:
3629   g_mutex_unlock (&priv->lock);
3630
3631   return TRUE;
3632
3633   /* ERRORS */
3634 no_stats:
3635   {
3636     GST_WARNING ("Could not get payloader stats");
3637     g_mutex_unlock (&priv->lock);
3638     return FALSE;
3639   }
3640 }
3641
3642 /**
3643  * gst_rtsp_stream_get_caps:
3644  * @stream: a #GstRTSPStream
3645  *
3646  * Retrieve the current caps of @stream.
3647  *
3648  * Returns: (transfer full) (nullable): the #GstCaps of @stream.
3649  * use gst_caps_unref() after usage.
3650  */
3651 GstCaps *
3652 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3653 {
3654   GstRTSPStreamPrivate *priv;
3655   GstCaps *result;
3656
3657   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3658
3659   priv = stream->priv;
3660
3661   g_mutex_lock (&priv->lock);
3662   if ((result = priv->caps))
3663     gst_caps_ref (result);
3664   g_mutex_unlock (&priv->lock);
3665
3666   return result;
3667 }
3668
3669 /**
3670  * gst_rtsp_stream_recv_rtp:
3671  * @stream: a #GstRTSPStream
3672  * @buffer: (transfer full): a #GstBuffer
3673  *
3674  * Handle an RTP buffer for the stream. This method is usually called when a
3675  * message has been received from a client using the TCP transport.
3676  *
3677  * This function takes ownership of @buffer.
3678  *
3679  * Returns: a GstFlowReturn.
3680  */
3681 GstFlowReturn
3682 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3683 {
3684   GstRTSPStreamPrivate *priv;
3685   GstFlowReturn ret;
3686   GstElement *element;
3687
3688   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3689   priv = stream->priv;
3690   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3691   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3692
3693   g_mutex_lock (&priv->lock);
3694   if (priv->appsrc[0])
3695     element = gst_object_ref (priv->appsrc[0]);
3696   else
3697     element = NULL;
3698   g_mutex_unlock (&priv->lock);
3699
3700   if (element) {
3701     if (priv->appsrc_base_time[0] == -1) {
3702       /* Take current running_time. This timestamp will be put on
3703        * the first buffer of each stream because we are a live source and so we
3704        * timestamp with the running_time. When we are dealing with TCP, we also
3705        * only timestamp the first buffer (using the DISCONT flag) because a server
3706        * typically bursts data, for which we don't want to compensate by speeding
3707        * up the media. The other timestamps will be interpollated from this one
3708        * using the RTP timestamps. */
3709       GST_OBJECT_LOCK (element);
3710       if (GST_ELEMENT_CLOCK (element)) {
3711         GstClockTime now;
3712         GstClockTime base_time;
3713
3714         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3715         base_time = GST_ELEMENT_CAST (element)->base_time;
3716
3717         priv->appsrc_base_time[0] = now - base_time;
3718         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3719         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3720             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3721             GST_TIME_ARGS (base_time));
3722       }
3723       GST_OBJECT_UNLOCK (element);
3724     }
3725
3726     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3727     gst_object_unref (element);
3728   } else {
3729     ret = GST_FLOW_OK;
3730   }
3731   return ret;
3732 }
3733
3734 /**
3735  * gst_rtsp_stream_recv_rtcp:
3736  * @stream: a #GstRTSPStream
3737  * @buffer: (transfer full): a #GstBuffer
3738  *
3739  * Handle an RTCP buffer for the stream. This method is usually called when a
3740  * message has been received from a client using the TCP transport.
3741  *
3742  * This function takes ownership of @buffer.
3743  *
3744  * Returns: a GstFlowReturn.
3745  */
3746 GstFlowReturn
3747 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3748 {
3749   GstRTSPStreamPrivate *priv;
3750   GstFlowReturn ret;
3751   GstElement *element;
3752
3753   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3754   priv = stream->priv;
3755   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3756
3757   if (priv->joined_bin == NULL) {
3758     gst_buffer_unref (buffer);
3759     return GST_FLOW_NOT_LINKED;
3760   }
3761   g_mutex_lock (&priv->lock);
3762   if (priv->appsrc[1])
3763     element = gst_object_ref (priv->appsrc[1]);
3764   else
3765     element = NULL;
3766   g_mutex_unlock (&priv->lock);
3767
3768   if (element) {
3769     if (priv->appsrc_base_time[1] == -1) {
3770       /* Take current running_time. This timestamp will be put on
3771        * the first buffer of each stream because we are a live source and so we
3772        * timestamp with the running_time. When we are dealing with TCP, we also
3773        * only timestamp the first buffer (using the DISCONT flag) because a server
3774        * typically bursts data, for which we don't want to compensate by speeding
3775        * up the media. The other timestamps will be interpollated from this one
3776        * using the RTP timestamps. */
3777       GST_OBJECT_LOCK (element);
3778       if (GST_ELEMENT_CLOCK (element)) {
3779         GstClockTime now;
3780         GstClockTime base_time;
3781
3782         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3783         base_time = GST_ELEMENT_CAST (element)->base_time;
3784
3785         priv->appsrc_base_time[1] = now - base_time;
3786         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3787         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3788             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3789             GST_TIME_ARGS (base_time));
3790       }
3791       GST_OBJECT_UNLOCK (element);
3792     }
3793
3794     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3795     gst_object_unref (element);
3796   } else {
3797     ret = GST_FLOW_OK;
3798     gst_buffer_unref (buffer);
3799   }
3800   return ret;
3801 }
3802
3803 /* must be called with lock */
3804 static gboolean
3805 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3806     gboolean add)
3807 {
3808   GstRTSPStreamPrivate *priv = stream->priv;
3809   const GstRTSPTransport *tr;
3810
3811   tr = gst_rtsp_stream_transport_get_transport (trans);
3812
3813   switch (tr->lower_transport) {
3814     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3815     {
3816       if (add) {
3817         if (!check_mcast_part_for_transport (stream, tr))
3818           goto mcast_error;
3819         priv->transports = g_list_prepend (priv->transports, trans);
3820
3821         if (tr->ttl > 0) {
3822           GST_INFO ("setting ttl-mc %d", tr->ttl);
3823           if (priv->mcast_udpsink[0])
3824             g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "ttl-mc", tr->ttl,
3825                 NULL);
3826           if (priv->mcast_udpsink[1])
3827             g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "ttl-mc", tr->ttl,
3828                 NULL);
3829         }
3830       } else {
3831         priv->transports = g_list_remove (priv->transports, trans);
3832       }
3833       break;
3834     }
3835     case GST_RTSP_LOWER_TRANS_UDP:
3836     {
3837       gchar *dest;
3838       gint min, max;
3839
3840       dest = tr->destination;
3841       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3842         min = tr->port.min;
3843         max = tr->port.max;
3844       } else if (priv->client_side) {
3845         /* In client side mode the 'destination' is the RTSP server, so send
3846          * to those ports */
3847         min = tr->server_port.min;
3848         max = tr->server_port.max;
3849       } else {
3850         min = tr->client_port.min;
3851         max = tr->client_port.max;
3852       }
3853
3854       if (add) {
3855         GST_INFO ("adding %s:%d-%d", dest, min, max);
3856         if (priv->udpsink[0])
3857           g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3858         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3859         priv->transports = g_list_prepend (priv->transports, trans);
3860       } else {
3861         GST_INFO ("removing %s:%d-%d", dest, min, max);
3862         if (priv->udpsink[0])
3863           g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3864         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3865         priv->transports = g_list_remove (priv->transports, trans);
3866       }
3867       priv->transports_cookie++;
3868       break;
3869     }
3870     case GST_RTSP_LOWER_TRANS_TCP:
3871       if (add) {
3872         GST_INFO ("adding TCP %s", tr->destination);
3873         priv->transports = g_list_prepend (priv->transports, trans);
3874       } else {
3875         GST_INFO ("removing TCP %s", tr->destination);
3876         priv->transports = g_list_remove (priv->transports, trans);
3877       }
3878       priv->transports_cookie++;
3879       break;
3880     default:
3881       goto unknown_transport;
3882   }
3883   return TRUE;
3884
3885   /* ERRORS */
3886 unknown_transport:
3887   {
3888     GST_INFO ("Unknown transport %d", tr->lower_transport);
3889     return FALSE;
3890   }
3891 mcast_error:
3892   {
3893     return FALSE;
3894   }
3895 }
3896
3897
3898 /**
3899  * gst_rtsp_stream_add_transport:
3900  * @stream: a #GstRTSPStream
3901  * @trans: (transfer none): a #GstRTSPStreamTransport
3902  *
3903  * Add the transport in @trans to @stream. The media of @stream will
3904  * then also be send to the values configured in @trans.
3905  *
3906  * @stream must be joined to a bin.
3907  *
3908  * @trans must contain a valid #GstRTSPTransport.
3909  *
3910  * Returns: %TRUE if @trans was added
3911  */
3912 gboolean
3913 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3914     GstRTSPStreamTransport * trans)
3915 {
3916   GstRTSPStreamPrivate *priv;
3917   gboolean res;
3918
3919   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3920   priv = stream->priv;
3921   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3922   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3923
3924   g_mutex_lock (&priv->lock);
3925   res = update_transport (stream, trans, TRUE);
3926   g_mutex_unlock (&priv->lock);
3927
3928   return res;
3929 }
3930
3931 /**
3932  * gst_rtsp_stream_remove_transport:
3933  * @stream: a #GstRTSPStream
3934  * @trans: (transfer none): a #GstRTSPStreamTransport
3935  *
3936  * Remove the transport in @trans from @stream. The media of @stream will
3937  * not be sent to the values configured in @trans.
3938  *
3939  * @stream must be joined to a bin.
3940  *
3941  * @trans must contain a valid #GstRTSPTransport.
3942  *
3943  * Returns: %TRUE if @trans was removed
3944  */
3945 gboolean
3946 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3947     GstRTSPStreamTransport * trans)
3948 {
3949   GstRTSPStreamPrivate *priv;
3950   gboolean res;
3951
3952   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3953   priv = stream->priv;
3954   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3955   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3956
3957   g_mutex_lock (&priv->lock);
3958   res = update_transport (stream, trans, FALSE);
3959   g_mutex_unlock (&priv->lock);
3960
3961   return res;
3962 }
3963
3964 /**
3965  * gst_rtsp_stream_update_crypto:
3966  * @stream: a #GstRTSPStream
3967  * @ssrc: the SSRC
3968  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3969  *
3970  * Update the new crypto information for @ssrc in @stream. If information
3971  * for @ssrc did not exist, it will be added. If information
3972  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3973  * be removed from @stream.
3974  *
3975  * Returns: %TRUE if @crypto could be updated
3976  */
3977 gboolean
3978 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3979     guint ssrc, GstCaps * crypto)
3980 {
3981   GstRTSPStreamPrivate *priv;
3982
3983   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3984   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3985
3986   priv = stream->priv;
3987
3988   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3989
3990   g_mutex_lock (&priv->lock);
3991   if (crypto)
3992     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3993         gst_caps_ref (crypto));
3994   else
3995     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3996   g_mutex_unlock (&priv->lock);
3997
3998   return TRUE;
3999 }
4000
4001 /**
4002  * gst_rtsp_stream_get_rtp_socket:
4003  * @stream: a #GstRTSPStream
4004  * @family: the socket family
4005  *
4006  * Get the RTP socket from @stream for a @family.
4007  *
4008  * @stream must be joined to a bin.
4009  *
4010  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
4011  * socket could be allocated for @family. Unref after usage
4012  */
4013 GSocket *
4014 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
4015 {
4016   GstRTSPStreamPrivate *priv = stream->priv;
4017   GSocket *socket;
4018   const gchar *name;
4019
4020   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4021   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4022       family == G_SOCKET_FAMILY_IPV6, NULL);
4023   g_return_val_if_fail (priv->udpsink[0], NULL);
4024
4025   if (family == G_SOCKET_FAMILY_IPV6)
4026     name = "socket-v6";
4027   else
4028     name = "socket";
4029
4030   g_object_get (priv->udpsink[0], name, &socket, NULL);
4031
4032   return socket;
4033 }
4034
4035 /**
4036  * gst_rtsp_stream_get_rtcp_socket:
4037  * @stream: a #GstRTSPStream
4038  * @family: the socket family
4039  *
4040  * Get the RTCP socket from @stream for a @family.
4041  *
4042  * @stream must be joined to a bin.
4043  *
4044  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
4045  * socket could be allocated for @family. Unref after usage
4046  */
4047 GSocket *
4048 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
4049 {
4050   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
4051   GSocket *socket;
4052   const gchar *name;
4053
4054   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4055   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4056       family == G_SOCKET_FAMILY_IPV6, NULL);
4057   g_return_val_if_fail (priv->udpsink[1], NULL);
4058
4059   if (family == G_SOCKET_FAMILY_IPV6)
4060     name = "socket-v6";
4061   else
4062     name = "socket";
4063
4064   g_object_get (priv->udpsink[1], name, &socket, NULL);
4065
4066   return socket;
4067 }
4068
4069 /**
4070  * gst_rtsp_stream_get_rtp_multicast_socket:
4071  * @stream: a #GstRTSPStream
4072  * @family: the socket family
4073  *
4074  * Get the multicast RTP socket from @stream for a @family.
4075  *
4076  * Returns: (transfer full) (nullable): the multicast RTP socket or %NULL if no
4077  * socket could be allocated for @family. Unref after usage
4078  */
4079 GSocket *
4080 gst_rtsp_stream_get_rtp_multicast_socket (GstRTSPStream * stream,
4081     GSocketFamily family)
4082 {
4083   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
4084   GSocket *socket;
4085   const gchar *name;
4086
4087   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4088   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4089       family == G_SOCKET_FAMILY_IPV6, NULL);
4090   g_return_val_if_fail (priv->mcast_udpsink[0], NULL);
4091
4092   if (family == G_SOCKET_FAMILY_IPV6)
4093     name = "socket-v6";
4094   else
4095     name = "socket";
4096
4097   g_object_get (priv->mcast_udpsink[0], name, &socket, NULL);
4098
4099   return socket;
4100 }
4101
4102 /**
4103  * gst_rtsp_stream_get_rtcp_multicast_socket:
4104  * @stream: a #GstRTSPStream
4105  * @family: the socket family
4106  *
4107  * Get the multicast RTCP socket from @stream for a @family.
4108  *
4109  * Returns: (transfer full) (nullable): the multicast RTCP socket or %NULL if no
4110  * socket could be allocated for @family. Unref after usage
4111  */
4112 GSocket *
4113 gst_rtsp_stream_get_rtcp_multicast_socket (GstRTSPStream * stream,
4114     GSocketFamily family)
4115 {
4116   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
4117   GSocket *socket;
4118   const gchar *name;
4119
4120   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4121   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4122       family == G_SOCKET_FAMILY_IPV6, NULL);
4123   g_return_val_if_fail (priv->mcast_udpsink[1], NULL);
4124
4125   if (family == G_SOCKET_FAMILY_IPV6)
4126     name = "socket-v6";
4127   else
4128     name = "socket";
4129
4130   g_object_get (priv->mcast_udpsink[1], name, &socket, NULL);
4131
4132   return socket;
4133 }
4134
4135 /**
4136  * gst_rtsp_stream_set_seqnum:
4137  * @stream: a #GstRTSPStream
4138  * @seqnum: a new sequence number
4139  *
4140  * Configure the sequence number in the payloader of @stream to @seqnum.
4141  */
4142 void
4143 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
4144 {
4145   GstRTSPStreamPrivate *priv;
4146
4147   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
4148
4149   priv = stream->priv;
4150
4151   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
4152 }
4153
4154 /**
4155  * gst_rtsp_stream_get_seqnum:
4156  * @stream: a #GstRTSPStream
4157  *
4158  * Get the configured sequence number in the payloader of @stream.
4159  *
4160  * Returns: the sequence number of the payloader.
4161  */
4162 guint16
4163 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
4164 {
4165   GstRTSPStreamPrivate *priv;
4166   guint seqnum;
4167
4168   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
4169
4170   priv = stream->priv;
4171
4172   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
4173
4174   return seqnum;
4175 }
4176
4177 /**
4178  * gst_rtsp_stream_transport_filter:
4179  * @stream: a #GstRTSPStream
4180  * @func: (scope call) (allow-none): a callback
4181  * @user_data: (closure): user data passed to @func
4182  *
4183  * Call @func for each transport managed by @stream. The result value of @func
4184  * determines what happens to the transport. @func will be called with @stream
4185  * locked so no further actions on @stream can be performed from @func.
4186  *
4187  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
4188  * @stream.
4189  *
4190  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
4191  *
4192  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
4193  * will also be added with an additional ref to the result #GList of this
4194  * function..
4195  *
4196  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
4197  *
4198  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
4199  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
4200  * element in the #GList should be unreffed before the list is freed.
4201  */
4202 GList *
4203 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
4204     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
4205 {
4206   GstRTSPStreamPrivate *priv;
4207   GList *result, *walk, *next;
4208   GHashTable *visited = NULL;
4209   guint cookie;
4210
4211   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4212
4213   priv = stream->priv;
4214
4215   result = NULL;
4216   if (func)
4217     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
4218
4219   g_mutex_lock (&priv->lock);
4220 restart:
4221   cookie = priv->transports_cookie;
4222   for (walk = priv->transports; walk; walk = next) {
4223     GstRTSPStreamTransport *trans = walk->data;
4224     GstRTSPFilterResult res;
4225     gboolean changed;
4226
4227     next = g_list_next (walk);
4228
4229     if (func) {
4230       /* only visit each transport once */
4231       if (g_hash_table_contains (visited, trans))
4232         continue;
4233
4234       g_hash_table_add (visited, g_object_ref (trans));
4235       g_mutex_unlock (&priv->lock);
4236
4237       res = func (stream, trans, user_data);
4238
4239       g_mutex_lock (&priv->lock);
4240     } else
4241       res = GST_RTSP_FILTER_REF;
4242
4243     changed = (cookie != priv->transports_cookie);
4244
4245     switch (res) {
4246       case GST_RTSP_FILTER_REMOVE:
4247         update_transport (stream, trans, FALSE);
4248         break;
4249       case GST_RTSP_FILTER_REF:
4250         result = g_list_prepend (result, g_object_ref (trans));
4251         break;
4252       case GST_RTSP_FILTER_KEEP:
4253       default:
4254         break;
4255     }
4256     if (changed)
4257       goto restart;
4258   }
4259   g_mutex_unlock (&priv->lock);
4260
4261   if (func)
4262     g_hash_table_unref (visited);
4263
4264   return result;
4265 }
4266
4267 static GstPadProbeReturn
4268 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4269 {
4270   GstRTSPStreamPrivate *priv;
4271   GstRTSPStream *stream;
4272   GstBuffer *buffer = NULL;
4273
4274   stream = user_data;
4275   priv = stream->priv;
4276
4277   GST_DEBUG_OBJECT (pad, "now blocking");
4278
4279   g_mutex_lock (&priv->lock);
4280   priv->blocking = TRUE;
4281
4282   if ((info->type & GST_PAD_PROBE_TYPE_BUFFER)) {
4283     buffer = gst_pad_probe_info_get_buffer (info);
4284   } else if ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)) {
4285     GstBufferList *list = gst_pad_probe_info_get_buffer_list (info);
4286     buffer = gst_buffer_list_get (list, 0);
4287   } else {
4288     g_assert_not_reached ();
4289   }
4290
4291   g_assert (buffer);
4292   priv->position = GST_BUFFER_TIMESTAMP (buffer);
4293   GST_DEBUG_OBJECT (stream, "buffer position: %" GST_TIME_FORMAT,
4294       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
4295   g_mutex_unlock (&priv->lock);
4296
4297   gst_element_post_message (priv->payloader,
4298       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
4299           gst_structure_new_empty ("GstRTSPStreamBlocking")));
4300
4301   return GST_PAD_PROBE_OK;
4302 }
4303
4304 static void
4305 set_blocked (GstRTSPStream * stream, gboolean blocked)
4306 {
4307   GstRTSPStreamPrivate *priv;
4308   int i;
4309
4310   GST_DEBUG_OBJECT (stream, "blocked: %d", blocked);
4311
4312   priv = stream->priv;
4313
4314   if (blocked) {
4315     for (i = 0; i < 2; i++) {
4316       if (priv->blocked_id[i] != 0)
4317         continue;
4318       if (priv->send_src[i]) {
4319         priv->blocking = FALSE;
4320         priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
4321             GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
4322             GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
4323             g_object_ref (stream), g_object_unref);
4324       }
4325     }
4326   } else {
4327     for (i = 0; i < 2; i++) {
4328       if (priv->blocked_id[i] != 0) {
4329         gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]);
4330         priv->blocked_id[i] = 0;
4331       }
4332     }
4333     priv->blocking = FALSE;
4334   }
4335 }
4336
4337 /**
4338  * gst_rtsp_stream_set_blocked:
4339  * @stream: a #GstRTSPStream
4340  * @blocked: boolean indicating we should block or unblock
4341  *
4342  * Blocks or unblocks the dataflow on @stream.
4343  *
4344  * Returns: %TRUE on success
4345  */
4346 gboolean
4347 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
4348 {
4349   GstRTSPStreamPrivate *priv;
4350
4351   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4352
4353   priv = stream->priv;
4354   g_mutex_lock (&priv->lock);
4355   set_blocked (stream, blocked);
4356   g_mutex_unlock (&priv->lock);
4357
4358   return TRUE;
4359 }
4360
4361 /**
4362  * gst_rtsp_stream_ublock_linked:
4363  * @stream: a #GstRTSPStream
4364  *
4365  * Unblocks the dataflow on @stream if it is linked.
4366  *
4367  * Returns: %TRUE on success
4368  */
4369 gboolean
4370 gst_rtsp_stream_unblock_linked (GstRTSPStream * stream)
4371 {
4372   GstRTSPStreamPrivate *priv;
4373
4374   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4375
4376   priv = stream->priv;
4377   g_mutex_lock (&priv->lock);
4378   if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0]))
4379     set_blocked (stream, FALSE);
4380   g_mutex_unlock (&priv->lock);
4381
4382   return TRUE;
4383 }
4384
4385 /**
4386  * gst_rtsp_stream_is_blocking:
4387  * @stream: a #GstRTSPStream
4388  *
4389  * Check if @stream is blocking on a #GstBuffer.
4390  *
4391  * Returns: %TRUE if @stream is blocking
4392  */
4393 gboolean
4394 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
4395 {
4396   GstRTSPStreamPrivate *priv;
4397   gboolean result;
4398
4399   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4400
4401   priv = stream->priv;
4402
4403   g_mutex_lock (&priv->lock);
4404   result = priv->blocking;
4405   g_mutex_unlock (&priv->lock);
4406
4407   return result;
4408 }
4409
4410 /**
4411  * gst_rtsp_stream_query_position:
4412  * @stream: a #GstRTSPStream
4413  * @position: (out): current position of a #GstRTSPStream
4414  *
4415  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
4416  * the RTP parts of the pipeline and not the RTCP parts.
4417  *
4418  * Returns: %TRUE if the position could be queried
4419  */
4420 gboolean
4421 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
4422 {
4423   GstRTSPStreamPrivate *priv;
4424   GstElement *sink;
4425   GstPad *pad = NULL;
4426
4427   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4428
4429   /* query position: if no sinks have been added yet,
4430    * we obtain the position from the pad otherwise we query the sinks */
4431
4432   priv = stream->priv;
4433
4434   g_mutex_lock (&priv->lock);
4435   /* depending on the transport type, it should query corresponding sink */
4436   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP)
4437     sink = priv->udpsink[0];
4438   else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4439     sink = priv->mcast_udpsink[0];
4440   else
4441     sink = priv->appsink[0];
4442
4443   if (sink) {
4444     gst_object_ref (sink);
4445   } else if (priv->send_src[0]) {
4446     pad = gst_object_ref (priv->send_src[0]);
4447   } else {
4448     g_mutex_unlock (&priv->lock);
4449     GST_WARNING_OBJECT (stream, "Couldn't obtain postion: erroneous pipeline");
4450     return FALSE;
4451   }
4452   g_mutex_unlock (&priv->lock);
4453
4454   if (sink) {
4455     if (!gst_element_query_position (sink, GST_FORMAT_TIME, position)) {
4456       GST_WARNING_OBJECT (stream,
4457           "Couldn't obtain postion: position query failed");
4458       gst_object_unref (sink);
4459       return FALSE;
4460     }
4461     gst_object_unref (sink);
4462   } else if (pad) {
4463     GstEvent *event;
4464     const GstSegment *segment;
4465
4466     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4467     if (!event) {
4468       GST_WARNING_OBJECT (stream, "Couldn't obtain postion: no segment event");
4469       gst_object_unref (pad);
4470       return FALSE;
4471     }
4472
4473     gst_event_parse_segment (event, &segment);
4474     if (segment->format != GST_FORMAT_TIME) {
4475       *position = -1;
4476     } else {
4477       g_mutex_lock (&priv->lock);
4478       *position = priv->position;
4479       g_mutex_unlock (&priv->lock);
4480       *position =
4481           gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *position);
4482     }
4483     gst_event_unref (event);
4484     gst_object_unref (pad);
4485   }
4486
4487   return TRUE;
4488 }
4489
4490 /**
4491  * gst_rtsp_stream_query_stop:
4492  * @stream: a #GstRTSPStream
4493  * @stop: (out): current stop of a #GstRTSPStream
4494  *
4495  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
4496  * the RTP parts of the pipeline and not the RTCP parts.
4497  *
4498  * Returns: %TRUE if the stop could be queried
4499  */
4500 gboolean
4501 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
4502 {
4503   GstRTSPStreamPrivate *priv;
4504   GstElement *sink;
4505   GstPad *pad = NULL;
4506
4507   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4508
4509   /* query stop position: if no sinks have been added yet,
4510    * we obtain the stop position from the pad otherwise we query the sinks */
4511
4512   priv = stream->priv;
4513
4514   g_mutex_lock (&priv->lock);
4515   /* depending on the transport type, it should query corresponding sink */
4516   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP)
4517     sink = priv->udpsink[0];
4518   else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4519     sink = priv->mcast_udpsink[0];
4520   else
4521     sink = priv->appsink[0];
4522
4523   if (sink) {
4524     gst_object_ref (sink);
4525   } else if (priv->send_src[0]) {
4526     pad = gst_object_ref (priv->send_src[0]);
4527   } else {
4528     g_mutex_unlock (&priv->lock);
4529     GST_WARNING_OBJECT (stream, "Couldn't obtain stop: erroneous pipeline");
4530     return FALSE;
4531   }
4532   g_mutex_unlock (&priv->lock);
4533
4534   if (sink) {
4535     GstQuery *query;
4536     GstFormat format;
4537
4538     query = gst_query_new_segment (GST_FORMAT_TIME);
4539     if (!gst_element_query (sink, query)) {
4540       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: element query failed");
4541       gst_query_unref (query);
4542       gst_object_unref (sink);
4543       return FALSE;
4544     }
4545     gst_query_parse_segment (query, NULL, &format, NULL, stop);
4546     if (format != GST_FORMAT_TIME)
4547       *stop = -1;
4548     gst_query_unref (query);
4549     gst_object_unref (sink);
4550   } else if (pad) {
4551     GstEvent *event;
4552     const GstSegment *segment;
4553
4554     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4555     if (!event) {
4556       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: no segment event");
4557       gst_object_unref (pad);
4558       return FALSE;
4559     }
4560     gst_event_parse_segment (event, &segment);
4561     if (segment->format != GST_FORMAT_TIME) {
4562       *stop = -1;
4563     } else {
4564       *stop = segment->stop;
4565       if (*stop == -1)
4566         *stop = segment->duration;
4567       else
4568         *stop = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *stop);
4569     }
4570     gst_event_unref (event);
4571     gst_object_unref (pad);
4572   }
4573
4574   return TRUE;
4575 }
4576
4577 /**
4578  * gst_rtsp_stream_seekable:
4579  * @stream: a #GstRTSPStream
4580  *
4581  * Checks whether the individual @stream is seekable.
4582  *
4583  * Returns: %TRUE if @stream is seekable, else %FALSE.
4584  */
4585 gboolean
4586 gst_rtsp_stream_seekable (GstRTSPStream * stream)
4587 {
4588   GstRTSPStreamPrivate *priv;
4589   GstPad *pad = NULL;
4590   GstQuery *query = NULL;
4591   gboolean seekable = FALSE;
4592
4593   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4594
4595   /* query stop position: if no sinks have been added yet,
4596    * we obtain the stop position from the pad otherwise we query the sinks */
4597
4598   priv = stream->priv;
4599
4600   g_mutex_lock (&priv->lock);
4601   /* depending on the transport type, it should query corresponding sink */
4602   if (priv->srcpad) {
4603     pad = gst_object_ref (priv->srcpad);
4604   } else {
4605     g_mutex_unlock (&priv->lock);
4606     GST_WARNING_OBJECT (stream, "Pad not available, can't query seekability");
4607     goto beach;
4608   }
4609   g_mutex_unlock (&priv->lock);
4610
4611   query = gst_query_new_seeking (GST_FORMAT_TIME);
4612   if (!gst_pad_query (pad, query)) {
4613     GST_WARNING_OBJECT (stream, "seeking query failed");
4614     goto beach;
4615   }
4616   gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
4617
4618 beach:
4619   if (pad)
4620     gst_object_unref (pad);
4621   if (query)
4622     gst_query_unref (query);
4623
4624   GST_DEBUG_OBJECT (stream, "Returning %d", seekable);
4625
4626   return seekable;
4627 }
4628
4629 /**
4630  * gst_rtsp_stream_complete_stream:
4631  * @stream: a #GstRTSPStream
4632  * @transport: a #GstRTSPTransport
4633  *
4634  * Add a receiver and sender part to the pipeline based on the transport from
4635  * SETUP.
4636  *
4637  * Returns: %TRUE if the stream has been sucessfully updated.
4638  */
4639 gboolean
4640 gst_rtsp_stream_complete_stream (GstRTSPStream * stream,
4641     const GstRTSPTransport * transport)
4642 {
4643   GstRTSPStreamPrivate *priv;
4644
4645   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4646
4647   priv = stream->priv;
4648   GST_DEBUG_OBJECT (stream, "complete stream");
4649
4650   g_mutex_lock (&priv->lock);
4651
4652   if (!(priv->protocols & transport->lower_transport))
4653     goto unallowed_transport;
4654
4655   if (!create_receiver_part (stream, transport))
4656     goto create_receiver_error;
4657
4658   /* in the RECORD case, we only add RTCP sender part */
4659   if (!create_sender_part (stream, transport))
4660     goto create_sender_error;
4661
4662   priv->is_complete = TRUE;
4663   g_mutex_unlock (&priv->lock);
4664
4665   GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated");
4666   return TRUE;
4667
4668 create_receiver_error:
4669 create_sender_error:
4670 unallowed_transport:
4671   {
4672     g_mutex_unlock (&priv->lock);
4673     return FALSE;
4674   }
4675 }
4676
4677 /**
4678  * gst_rtsp_stream_is_complete:
4679  * @stream: a #GstRTSPStream
4680  *
4681  * Checks whether the stream is complete, contains the receiver and the sender
4682  * parts. As the stream contains sink(s) element(s), it's possible to perform
4683  * seek operations on it.
4684  *
4685  * Returns: %TRUE if the stream contains at least one sink element.
4686  */
4687 gboolean
4688 gst_rtsp_stream_is_complete (GstRTSPStream * stream)
4689 {
4690   GstRTSPStreamPrivate *priv;
4691   gboolean ret = FALSE;
4692
4693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4694
4695   priv = stream->priv;
4696   g_mutex_lock (&priv->lock);
4697   ret = priv->is_complete;
4698   g_mutex_unlock (&priv->lock);
4699
4700   return ret;
4701 }
4702
4703 /**
4704  * gst_rtsp_stream_is_sender:
4705  * @stream: a #GstRTSPStream
4706  *
4707  * Checks whether the stream is a sender.
4708  *
4709  * Returns: %TRUE if the stream is a sender and %FALSE otherwise.
4710  */
4711 gboolean
4712 gst_rtsp_stream_is_sender (GstRTSPStream * stream)
4713 {
4714   GstRTSPStreamPrivate *priv;
4715   gboolean ret = FALSE;
4716
4717   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4718
4719   priv = stream->priv;
4720   g_mutex_lock (&priv->lock);
4721   ret = (priv->srcpad != NULL);
4722   g_mutex_unlock (&priv->lock);
4723
4724   return ret;
4725 }
4726
4727 /**
4728  * gst_rtsp_stream_is_receiver:
4729  * @stream: a #GstRTSPStream
4730  *
4731  * Checks whether the stream is a receiver.
4732  *
4733  * Returns: %TRUE if the stream is a receiver and %FALSE otherwise.
4734  */
4735 gboolean
4736 gst_rtsp_stream_is_receiver (GstRTSPStream * stream)
4737 {
4738   GstRTSPStreamPrivate *priv;
4739   gboolean ret = FALSE;
4740
4741   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4742
4743   priv = stream->priv;
4744   g_mutex_lock (&priv->lock);
4745   ret = (priv->sinkpad != NULL);
4746   g_mutex_unlock (&priv->lock);
4747
4748   return ret;
4749 }
4750
4751 #define AES_128_KEY_LEN 16
4752 #define AES_256_KEY_LEN 32
4753
4754 #define HMAC_32_KEY_LEN 4
4755 #define HMAC_80_KEY_LEN 10
4756
4757 static gboolean
4758 mikey_apply_policy (GstCaps * caps, GstMIKEYMessage * msg, guint8 policy)
4759 {
4760   const gchar *srtp_cipher;
4761   const gchar *srtp_auth;
4762   const GstMIKEYPayload *sp;
4763   guint i;
4764
4765   /* loop over Security policy until we find one containing policy */
4766   for (i = 0;; i++) {
4767     if ((sp = gst_mikey_message_find_payload (msg, GST_MIKEY_PT_SP, i)) == NULL)
4768       break;
4769
4770     if (((GstMIKEYPayloadSP *) sp)->policy == policy)
4771       break;
4772   }
4773
4774   /* the default ciphers */
4775   srtp_cipher = "aes-128-icm";
4776   srtp_auth = "hmac-sha1-80";
4777
4778   /* now override the defaults with what is in the Security Policy */
4779   if (sp != NULL) {
4780     guint len;
4781
4782     /* collect all the params and go over them */
4783     len = gst_mikey_payload_sp_get_n_params (sp);
4784     for (i = 0; i < len; i++) {
4785       const GstMIKEYPayloadSPParam *param =
4786           gst_mikey_payload_sp_get_param (sp, i);
4787
4788       switch (param->type) {
4789         case GST_MIKEY_SP_SRTP_ENC_ALG:
4790           switch (param->val[0]) {
4791             case 0:
4792               srtp_cipher = "null";
4793               break;
4794             case 2:
4795             case 1:
4796               srtp_cipher = "aes-128-icm";
4797               break;
4798             default:
4799               break;
4800           }
4801           break;
4802         case GST_MIKEY_SP_SRTP_ENC_KEY_LEN:
4803           switch (param->val[0]) {
4804             case AES_128_KEY_LEN:
4805               srtp_cipher = "aes-128-icm";
4806               break;
4807             case AES_256_KEY_LEN:
4808               srtp_cipher = "aes-256-icm";
4809               break;
4810             default:
4811               break;
4812           }
4813           break;
4814         case GST_MIKEY_SP_SRTP_AUTH_ALG:
4815           switch (param->val[0]) {
4816             case 0:
4817               srtp_auth = "null";
4818               break;
4819             case 2:
4820             case 1:
4821               srtp_auth = "hmac-sha1-80";
4822               break;
4823             default:
4824               break;
4825           }
4826           break;
4827         case GST_MIKEY_SP_SRTP_AUTH_KEY_LEN:
4828           switch (param->val[0]) {
4829             case HMAC_32_KEY_LEN:
4830               srtp_auth = "hmac-sha1-32";
4831               break;
4832             case HMAC_80_KEY_LEN:
4833               srtp_auth = "hmac-sha1-80";
4834               break;
4835             default:
4836               break;
4837           }
4838           break;
4839         case GST_MIKEY_SP_SRTP_SRTP_ENC:
4840           break;
4841         case GST_MIKEY_SP_SRTP_SRTCP_ENC:
4842           break;
4843         default:
4844           break;
4845       }
4846     }
4847   }
4848   /* now configure the SRTP parameters */
4849   gst_caps_set_simple (caps,
4850       "srtp-cipher", G_TYPE_STRING, srtp_cipher,
4851       "srtp-auth", G_TYPE_STRING, srtp_auth,
4852       "srtcp-cipher", G_TYPE_STRING, srtp_cipher,
4853       "srtcp-auth", G_TYPE_STRING, srtp_auth, NULL);
4854
4855   return TRUE;
4856 }
4857
4858 static gboolean
4859 handle_mikey_data (GstRTSPStream * stream, guint8 * data, gsize size)
4860 {
4861   GstMIKEYMessage *msg;
4862   guint i, n_cs;
4863   GstCaps *caps = NULL;
4864   GstMIKEYPayloadKEMAC *kemac;
4865   const GstMIKEYPayloadKeyData *pkd;
4866   GstBuffer *key;
4867
4868   /* the MIKEY message contains a CSB or crypto session bundle. It is a
4869    * set of Crypto Sessions protected with the same master key.
4870    * In the context of SRTP, an RTP and its RTCP stream is part of a
4871    * crypto session */
4872   if ((msg = gst_mikey_message_new_from_data (data, size, NULL, NULL)) == NULL)
4873     goto parse_failed;
4874
4875   /* we can only handle SRTP crypto sessions for now */
4876   if (msg->map_type != GST_MIKEY_MAP_TYPE_SRTP)
4877     goto invalid_map_type;
4878
4879   /* get the number of crypto sessions. This maps SSRC to its
4880    * security parameters */
4881   n_cs = gst_mikey_message_get_n_cs (msg);
4882   if (n_cs == 0)
4883     goto no_crypto_sessions;
4884
4885   /* we also need keys */
4886   if (!(kemac = (GstMIKEYPayloadKEMAC *) gst_mikey_message_find_payload
4887           (msg, GST_MIKEY_PT_KEMAC, 0)))
4888     goto no_keys;
4889
4890   /* we don't support encrypted keys */
4891   if (kemac->enc_alg != GST_MIKEY_ENC_NULL
4892       || kemac->mac_alg != GST_MIKEY_MAC_NULL)
4893     goto unsupported_encryption;
4894
4895   /* get Key data sub-payload */
4896   pkd = (const GstMIKEYPayloadKeyData *)
4897       gst_mikey_payload_kemac_get_sub (&kemac->pt, 0);
4898
4899   key =
4900       gst_buffer_new_wrapped (g_memdup (pkd->key_data, pkd->key_len),
4901       pkd->key_len);
4902
4903   /* go over all crypto sessions and create the security policy for each
4904    * SSRC */
4905   for (i = 0; i < n_cs; i++) {
4906     const GstMIKEYMapSRTP *map = gst_mikey_message_get_cs_srtp (msg, i);
4907
4908     caps = gst_caps_new_simple ("application/x-srtp",
4909         "ssrc", G_TYPE_UINT, map->ssrc,
4910         "roc", G_TYPE_UINT, map->roc, "srtp-key", GST_TYPE_BUFFER, key, NULL);
4911     mikey_apply_policy (caps, msg, map->policy);
4912
4913     gst_rtsp_stream_update_crypto (stream, map->ssrc, caps);
4914     gst_caps_unref (caps);
4915   }
4916   gst_mikey_message_unref (msg);
4917   gst_buffer_unref (key);
4918
4919   return TRUE;
4920
4921   /* ERRORS */
4922 parse_failed:
4923   {
4924     GST_DEBUG_OBJECT (stream, "failed to parse MIKEY message");
4925     return FALSE;
4926   }
4927 invalid_map_type:
4928   {
4929     GST_DEBUG_OBJECT (stream, "invalid map type %d", msg->map_type);
4930     goto cleanup_message;
4931   }
4932 no_crypto_sessions:
4933   {
4934     GST_DEBUG_OBJECT (stream, "no crypto sessions");
4935     goto cleanup_message;
4936   }
4937 no_keys:
4938   {
4939     GST_DEBUG_OBJECT (stream, "no keys found");
4940     goto cleanup_message;
4941   }
4942 unsupported_encryption:
4943   {
4944     GST_DEBUG_OBJECT (stream, "unsupported key encryption");
4945     goto cleanup_message;
4946   }
4947 cleanup_message:
4948   {
4949     gst_mikey_message_unref (msg);
4950     return FALSE;
4951   }
4952 }
4953
4954 #define IS_STRIP_CHAR(c) (g_ascii_isspace ((guchar)(c)) || ((c) == '\"'))
4955
4956 static void
4957 strip_chars (gchar * str)
4958 {
4959   gchar *s;
4960   gsize len;
4961
4962   len = strlen (str);
4963   while (len--) {
4964     if (!IS_STRIP_CHAR (str[len]))
4965       break;
4966     str[len] = '\0';
4967   }
4968   for (s = str; *s && IS_STRIP_CHAR (*s); s++);
4969   memmove (str, s, len + 1);
4970 }
4971
4972 /**
4973  * gst_rtsp_stream_handle_keymgmt:
4974  * @stream: a #GstRTSPStream
4975  * @keymgmt: a keymgmt header
4976  *
4977  * Parse and handle a KeyMgmt header.
4978  *
4979  * Since: 1.16
4980  */
4981 /* KeyMgmt = "KeyMgmt" ":" key-mgmt-spec 0*("," key-mgmt-spec)
4982  * key-mgmt-spec = "prot" "=" KMPID ";" ["uri" "=" %x22 URI %x22 ";"]
4983  */
4984 gboolean
4985 gst_rtsp_stream_handle_keymgmt (GstRTSPStream * stream, const gchar * keymgmt)
4986 {
4987   gchar **specs;
4988   gint i, j;
4989
4990   specs = g_strsplit (keymgmt, ",", 0);
4991   for (i = 0; specs[i]; i++) {
4992     gchar **split;
4993
4994     split = g_strsplit (specs[i], ";", 0);
4995     for (j = 0; split[j]; j++) {
4996       g_strstrip (split[j]);
4997       if (g_str_has_prefix (split[j], "prot=")) {
4998         g_strstrip (split[j] + 5);
4999         if (!g_str_equal (split[j] + 5, "mikey"))
5000           break;
5001         GST_DEBUG ("found mikey");
5002       } else if (g_str_has_prefix (split[j], "uri=")) {
5003         strip_chars (split[j] + 4);
5004         GST_DEBUG ("found uri '%s'", split[j] + 4);
5005       } else if (g_str_has_prefix (split[j], "data=")) {
5006         guchar *data;
5007         gsize size;
5008         strip_chars (split[j] + 5);
5009         GST_DEBUG ("found data '%s'", split[j] + 5);
5010         data = g_base64_decode_inplace (split[j] + 5, &size);
5011         handle_mikey_data (stream, data, size);
5012       }
5013     }
5014     g_strfreev (split);
5015   }
5016   g_strfreev (specs);
5017   return TRUE;
5018 }
5019
5020
5021 /**
5022  * gst_rtsp_stream_get_ulpfec_pt:
5023  *
5024  * Returns: the payload type used for ULPFEC protection packets
5025  *
5026  * Since: 1.16
5027  */
5028 guint
5029 gst_rtsp_stream_get_ulpfec_pt (GstRTSPStream * stream)
5030 {
5031   guint res;
5032
5033   g_mutex_lock (&stream->priv->lock);
5034   res = stream->priv->ulpfec_pt;
5035   g_mutex_unlock (&stream->priv->lock);
5036
5037   return res;
5038 }
5039
5040 /**
5041  * gst_rtsp_stream_set_ulpfec_pt:
5042  *
5043  * Set the payload type to be used for ULPFEC protection packets
5044  *
5045  * Since: 1.16
5046  */
5047 void
5048 gst_rtsp_stream_set_ulpfec_pt (GstRTSPStream * stream, guint pt)
5049 {
5050   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5051
5052   g_mutex_lock (&stream->priv->lock);
5053   stream->priv->ulpfec_pt = pt;
5054   if (stream->priv->ulpfec_encoder) {
5055     g_object_set (stream->priv->ulpfec_encoder, "pt", pt, NULL);
5056   }
5057   g_mutex_unlock (&stream->priv->lock);
5058 }
5059
5060 /**
5061  * gst_rtsp_stream_request_ulpfec_decoder:
5062  *
5063  * Creating a rtpulpfecdec element
5064  *
5065  * Returns: (transfer full) (nullable): a #GstElement.
5066  *
5067  * Since: 1.16
5068  */
5069 GstElement *
5070 gst_rtsp_stream_request_ulpfec_decoder (GstRTSPStream * stream,
5071     GstElement * rtpbin, guint sessid)
5072 {
5073   GObject *internal_storage = NULL;
5074
5075   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5076   stream->priv->ulpfec_decoder =
5077       gst_object_ref (gst_element_factory_make ("rtpulpfecdec", NULL));
5078
5079   g_signal_emit_by_name (G_OBJECT (rtpbin), "get-internal-storage", sessid,
5080       &internal_storage);
5081   g_object_set (stream->priv->ulpfec_decoder, "storage", internal_storage,
5082       NULL);
5083   g_object_unref (internal_storage);
5084   update_ulpfec_decoder_pt (stream);
5085
5086   return stream->priv->ulpfec_decoder;
5087 }
5088
5089 /**
5090  * gst_rtsp_stream_request_ulpfec_encoder:
5091  *
5092  * Creating a rtpulpfecenc element
5093  *
5094  * Returns: (transfer full) (nullable): a #GstElement.
5095  *
5096  * Since: 1.16
5097  */
5098 GstElement *
5099 gst_rtsp_stream_request_ulpfec_encoder (GstRTSPStream * stream, guint sessid)
5100 {
5101   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5102
5103   if (!stream->priv->ulpfec_percentage)
5104     return NULL;
5105
5106   stream->priv->ulpfec_encoder =
5107       gst_object_ref (gst_element_factory_make ("rtpulpfecenc", NULL));
5108
5109   g_object_set (stream->priv->ulpfec_encoder, "pt", stream->priv->ulpfec_pt,
5110       "percentage", stream->priv->ulpfec_percentage, NULL);
5111
5112   return stream->priv->ulpfec_encoder;
5113 }
5114
5115 /**
5116  * gst_rtsp_stream_set_ulpfec_percentage:
5117  *
5118  * Sets the amount of redundancy to apply when creating ULPFEC
5119  * protection packets.
5120  *
5121  * Since: 1.16
5122  */
5123 void
5124 gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream * stream, guint percentage)
5125 {
5126   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5127
5128   g_mutex_lock (&stream->priv->lock);
5129   stream->priv->ulpfec_percentage = percentage;
5130   if (stream->priv->ulpfec_encoder) {
5131     g_object_set (stream->priv->ulpfec_encoder, "percentage", percentage, NULL);
5132   }
5133   g_mutex_unlock (&stream->priv->lock);
5134 }
5135
5136 /**
5137  * gst_rtsp_stream_get_ulpfec_percentage:
5138  *
5139  * Returns: the amount of redundancy applied when creating ULPFEC
5140  * protection packets.
5141  *
5142  * Since: 1.16
5143  */
5144 guint
5145 gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
5146 {
5147   guint res;
5148
5149   g_mutex_lock (&stream->priv->lock);
5150   res = stream->priv->ulpfec_percentage;
5151   g_mutex_unlock (&stream->priv->lock);
5152
5153   return res;
5154 }